In [13]:
import os
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, root_mean_squared_error
import numpy as np
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope
from dagshub import get_repo_bucket_client


In [15]:
# Create the directory if it doesn't exist
!mkdir -p ../data

# Download files using curl
!curl -o ../data/green_tripdata_2024-01.parquet https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet
!curl -o ../data/green_tripdata_2024-02.parquet https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-02.parquet

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1330k  100 1330k    0     0  5776k      0 --:--:-- --:--:-- --:--:-- 5784k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1253k  100 1253k    0     0  6264k      0 --:--:-- --:--:-- --:--:-- 6237k


In [16]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)
    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
    df = df[(df.duration >= 1) & (df.duration <= 60)]
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    return df

In [17]:
df_train = read_dataframe('../data/green_tripdata_2024-01.parquet')
df_val = read_dataframe('../data/green_tripdata_2024-02.parquet')
df_test = read_dataframe('../data/green_tripdata_2024-03.parquet')

categorical = ['PU_DO']
numerical = ['trip_distance']
target = 'duration'


In [18]:
dv = DictVectorizer()

train_dicts = df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)
y_train = df_train[target].values

val_dicts = df_val[categorical + numerical].to_dict(orient='records')
X_val = dv.transform(val_dicts)
y_val = df_val[target].values

test_dicts = df_test[categorical + numerical].to_dict(orient='records')
X_test = dv.transform(test_dicts)
y_test = df_test[target].values

In [22]:
import os
import mlflow
import dagshub
from mlflow.tracking import MlflowClient

# Configurar DagsHub y MLflow
dagshub.init(repo_owner="JuanPab2009", repo_name="nys-taxi-time-prediction", mlflow=True)

# Obtener la URI de seguimiento de MLflow
mlflow_tracking_uri = mlflow.get_tracking_uri()
print(f"MLflow Tracking URI: {mlflow_tracking_uri}")

# Configurar MLflow para usar la URI de DagsHub
mlflow.set_tracking_uri(mlflow_tracking_uri)

# Nombre del experimento
experiment_name = "nys-taxi-experiment"

# Crear o obtener el experimento
client = MlflowClient()
try:
    experiment = client.create_experiment(experiment_name)
except mlflow.exceptions.MlflowException:
    experiment = client.get_experiment_by_name(experiment_name)

if experiment:
    print(f"Experiment ID: {experiment.experiment_id}")
    print(f"Artifact Location: {experiment.artifact_location}")
    
    # Configurar el experimento activo
    mlflow.set_experiment(experiment_name)
else:
    print("Failed to create or retrieve the experiment.")

MLflow Tracking URI: https://dagshub.com/JuanPab2009/nys-taxi-time-prediction.mlflow
Experiment ID: 0
Artifact Location: mlflow-artifacts:/263051c210234e5a9f3b7186f93060c2


In [23]:
# Función para ejecutar experimentos
def objective(params, model_class, X_train, y_train, X_val, y_val):
    with mlflow.start_run(nested=True):
        model = model_class(**params)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_val)
        rmse = root_mean_squared_error(y_val, y_pred)
        mlflow.log_params(params)
        mlflow.log_metric("rmse", rmse)
        return {'loss': rmse, 'status': STATUS_OK}

In [24]:
# Espacios de búsqueda para hiperparámetros
gb_space = {
    'n_estimators': scope.int(hp.quniform('n_estimators', 100, 500, 50)),
    'learning_rate': hp.loguniform('learning_rate', np.log(0.01), np.log(0.2)),
    'max_depth': scope.int(hp.quniform('max_depth', 3, 10, 1))
}

rf_space = {
    'n_estimators': scope.int(hp.quniform('n_estimators', 100, 500, 50)),
    'max_depth': scope.int(hp.quniform('max_depth', 3, 20, 1)),
    'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1))
}


In [None]:
# Experimentos
with mlflow.start_run(run_name="Parent Experiment"):
    # Gradient Boost
    gb_trials = Trials()
    best_gb = fmin(
        fn=lambda params: objective(params, GradientBoostingRegressor, X_train, y_train, X_val, y_val),
        space=gb_space,
        algo=tpe.suggest,
        max_evals=20,
        trials=gb_trials
    )
    
    # Random Forest
    rf_trials = Trials()
    best_rf = fmin(
        fn=lambda params: objective(params, RandomForestRegressor, X_train, y_train, X_val, y_val),
        space=rf_space,
        algo=tpe.suggest,
        max_evals=20,
        trials=rf_trials
    )

    # Seleccionar el mejor modelo
    gb_rmse = min(trial['result']['loss'] for trial in gb_trials.trials)
    rf_rmse = min(trial['result']['loss'] for trial in rf_trials.trials)

    if gb_rmse < rf_rmse:
        best_model = GradientBoostingRegressor(**best_gb)
        best_model_name = "GradientBoost"
    else:
        best_model = RandomForestRegressor(**best_rf)
        best_model_name = "RandomForest"

    # Entrenar el mejor modelo con todos los datos
    best_model.fit(X_train, y_train)

    # Registrar el mejor modelo
    mlflow.sklearn.log_model(best_model, "nyc-taxi-model")
    
    # Asignar el alias 'challenger'
    client = mlflow.tracking.MlflowClient()
    model_version = client.create_model_version("nyc-taxi-model", f"runs:/{mlflow.active_run().info.run_id}/nyc-taxi-model")
    client.set_registered_model_alias("nyc-taxi-model", "challenger", model_version.version)

In [None]:
# Evaluar modelos en el conjunto de prueba
champion_model = mlflow.sklearn.load_model("models:/nyc-taxi-model@champion")
challenger_model = mlflow.sklearn.load_model("models:/nyc-taxi-model@challenger")

champion_rmse = root_mean_squared_error(y_test, champion_model.predict(X_test))
challenger_rmse = root_mean_squared_error(y_test, challenger_model.predict(X_test))

print(f"Champion RMSE: {champion_rmse}")
print(f"Challenger RMSE: {challenger_rmse}")

In [None]:
# Decidir si promover el challenger a champion
if challenger_rmse < champion_rmse * 0.95:
    print("El challenger supera significativamente al champion y debería ser promovido.")
    client.set_registered_model_alias("nyc-taxi-model", "champion", model_version.version)
else:
    print("El challenger no supera significativamente al champion. Mantenemos el champion actual.")


In [None]:
# Subir datos a DagsHub
s3 = get_repo_bucket_client("JuanPab2009/nys-taxi-time-prediction")

s3.upload_file(
    Bucket="nys-taxi-time-prediction",
    Filename="../data/green_tripdata_2024-03.parquet",
    Key="test_data.parquet",
)