In [1]:
import datetime
from IPython.display import display



from typing import List
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression, Lasso, Ridge
from sklearn.svm import LinearSVR
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.metrics import mean_squared_error

from sklearn.pipeline import make_pipeline

from datetime import datetime
import requests
import mlflow
import pickle
from hyperopt import hp, STATUS_OK, Trials, pyll,fmin,tpe
import xgboost as xgb





import os

In [2]:
import boto3

# Create an S3 client
s3 = boto3.client('s3')

# List all buckets in your S3 account
response = s3.list_buckets()
buckets = response['Buckets']

for bucket in buckets:
    print(f"Bucket Name: {bucket['Name']}")


Bucket Name: mlflow-artifact-remote-chicago-taxi-prediction


In [3]:
TRACKING_SERVER_HOST = 'ec2-3-145-52-96.us-east-2.compute.amazonaws.com'
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:5000")

In [4]:
print(f'tracking uri "{mlflow.get_tracking_uri()}"')

tracking uri "http://ec2-3-145-52-96.us-east-2.compute.amazonaws.com:5000"


In [6]:
mlflow_experiment_name = "chicago-taxi-experiment"

mlflow.set_experiment(mlflow_experiment_name)

<Experiment: artifact_location='s3://mlflow-artifact-remote-chicago-taxi-prediction/1', creation_time=1691888942267, experiment_id='1', last_update_time=1691888942267, lifecycle_stage='active', name='chicago-taxi-experiment', tags={}>

In [7]:
# TRAIN DATASET:
Train_parquet_file_path = 'data/chicago_taxi_train_dataset_2023-01.parquet'  # Replace with the desired Parquet file path
# VALIDATION DATASET:
Val_parquet_file_path = 'data/chicago_taxi_Val_dataset_2023-02.parquet'
# TEST DATASET:
Test_parquet_file_path = 'data/chicago_taxi_test_dataset_2023-03.parquet'

In [8]:
df_train_r = pd.read_parquet(Train_parquet_file_path)
df_val_r = pd.read_parquet(Val_parquet_file_path)
df_test_r = pd.read_parquet(Test_parquet_file_path)

In [9]:
df_train_r.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 414478 entries, 0 to 414477
Data columns (total 23 columns):
 #   Column                      Non-Null Count   Dtype         
---  ------                      --------------   -----         
 0   trip_id                     414478 non-null  object        
 1   taxi_id                     414457 non-null  object        
 2   trip_start_timestamp        414478 non-null  datetime64[ns]
 3   trip_end_timestamp          414461 non-null  datetime64[ns]
 4   trip_seconds                414378 non-null  float64       
 5   trip_miles                  414478 non-null  float64       
 6   pickup_census_tract         136206 non-null  float64       
 7   dropoff_census_tract        135485 non-null  float64       
 8   pickup_community_area       414478 non-null  object        
 9   dropoff_community_area      414478 non-null  object        
 10  fare                        414073 non-null  float64       
 11  tips                        414073 non-

In [10]:
import pandas as pd
from typing import List

def preprocess_data(df: pd.DataFrame, categorical_features: List[str], numerical_features: List[str], verbose: bool = False):

    df = df[df.trip_seconds.notnull()]
    df = df[df.trip_start_timestamp.notnull()]
    
    df = df[(df['trip_seconds'] > 60) & (df['trip_seconds'] < 3600)]

    # Create the 'duration' column in minutes as the target variable
    df['duration'] = df['trip_seconds'] / 60

    # Preprocess categorical features
    for column in categorical_features:
        df[column].fillna(-1, inplace=True)
        df[column] = df[column].astype('str')
        df[column] = df[column].str.lower().str.replace(' ', '_')

    # Select only the relevant columns
    target = ['duration']
    selected_columns = categorical_features + numerical_features + target
    df = df[selected_columns]

    if verbose:
        print(f"\nshape of the data: {df.shape}")
        print(f"\ndata types:\n{df.dtypes}")
        print(f"\ncategorical_features: {categorical_features}")
        print(f"\nnumerical_features: {numerical_features}")
        print(f"\ntarget variable: {target}")

    return df[categorical_features + numerical_features], df[target]


In [11]:
df_test, y_test = preprocess_data(
    df_test_r,
    categorical_features=['pickup_community_area','dropoff_community_area'],
    numerical_features=[],
    verbose=True
)


shape of the data: (542815, 3)

data types:
pickup_community_area      object
dropoff_community_area     object
duration                  float64
dtype: object

categorical_features: ['pickup_community_area', 'dropoff_community_area']

numerical_features: []

target variable: ['duration']


In [18]:
df_val, y_val = preprocess_data(
    df_val_r,
    categorical_features=['pickup_community_area','dropoff_community_area'],
    numerical_features=[],
    verbose=True
)


shape of the data: (417382, 3)

data types:
pickup_community_area      object
dropoff_community_area     object
duration                  float64
dtype: object

categorical_features: ['pickup_community_area', 'dropoff_community_area']

numerical_features: []

target variable: ['duration']


In [15]:
df_train, y_train = preprocess_data(
    df_train_r,
    categorical_features=['pickup_community_area','dropoff_community_area'],
    numerical_features=[],
    verbose=True
)


shape of the data: (387579, 3)

data types:
pickup_community_area      object
dropoff_community_area     object
duration                  float64
dtype: object

categorical_features: ['pickup_community_area', 'dropoff_community_area']

numerical_features: []

target variable: ['duration']


In [16]:
def prepare_dictionaries(df:pd.DataFrame, categorical_features:List[str], numerical_features:List[str]):
    dicts = df[categorical_features + numerical_features].to_dict(orient='records')
    return dicts

In [19]:
train_dict = prepare_dictionaries(
    df_train,
    categorical_features=['pickup_community_area','dropoff_community_area'],
    numerical_features=[]
)
val_dict = prepare_dictionaries(
    df_val,
    categorical_features=['pickup_community_area','dropoff_community_area'],
    numerical_features=[]
)
test_dict = prepare_dictionaries(
    df_test,
    categorical_features=['pickup_community_area','dropoff_community_area'],
    numerical_features=[]
)

In [20]:
y_train = y_train.values
y_val = y_val.values
y_test = y_test.values

In [76]:
# dv = DictVectorizer()
# X_train = dv.fit_transform(train_dict)
# X_val = dv.transform(val_dict)
# X_test = dv.transform(test_dict)

 ### Evaluating the model

In [24]:
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction import DictVectorizer
with mlflow.start_run(run_name='Random_forest'):
    params = {
        'n_estimators': 100, 
        'max_depth': 10, 
        'min_samples_split': 10,
        'min_samples_leaf': 4
    }

    pipeline = make_pipeline(
        DictVectorizer(),
        RandomForestRegressor(random_state=42)
    )

    pipeline.fit(train_dict, y_train)
    y_pred = pipeline.predict(val_dict)
    rmse = mean_squared_error(y_val, y_pred, squared=False)
    rmse_test = mean_squared_error(y_test, pipeline.predict(test_dict), squared=False)
    mlflow.set_tag('developer', 'Arun Dhanapalan')
    mlflow.set_tag('project', 'chicago-taxi-prediction')
    mlflow.log_param('train-data_path', Train_parquet_file_path)
    mlflow.log_param('Validation-data_path', Val_parquet_file_path)
    mlflow.log_metric('val_rmse', rmse)
    mlflow.log_metric('test_rmse', rmse_test)
    mlflow.set_tag("model_name", 'Random_forest')
    mlflow.sklearn.log_model(pipeline, artifact_path="models")
    print(f'Hyperparameters: {params}')
    print(f'Validation RMSE: {rmse}') 
    print(f'Test RMSE: {rmse_test}')

  return fit_method(estimator, *args, **kwargs)


Hyperparameters: {'n_estimators': 100, 'max_depth': 10, 'min_samples_split': 10, 'min_samples_leaf': 4}
Validation RMSE: 8.11854983176106
Test RMSE: 8.658390359289022


In [25]:
from mlflow.tracking import MlflowClient
import mlflow

ML_FLOW_TRACKING_URI = "http://ec2-3-145-52-96.us-east-2.compute.amazonaws.com:5000"
client = MlflowClient(tracking_uri=ML_FLOW_TRACKING_URI)

In [26]:
from mlflow.entities import ViewType
runs = client.search_runs(
    experiment_ids='1',
    filter_string='metrics.test_rmse < 9',
    run_view_type=ViewType.ACTIVE_ONLY
    # max_results=5,
    # order_by=["metrics.test_rmse ASC"]
)

In [27]:
for run in runs:
    print(f"run id: {run.info.run_id}, \nVal_rmse: {run.data.metrics['val_rmse']:.3f} \nTest rmse: {run.data.metrics['test_rmse']:.3f}")

run id: a6203da436864c7ea7d2ce768f2ec697, 
Val_rmse: 8.119 
Test rmse: 8.658
run id: 5c5419efd6f64a9bb9f168860160a8dd, 
Val_rmse: 8.106 
Test rmse: 8.645
run id: 15c1ae822734476d96e118a9e7e7034a, 
Val_rmse: 8.127 
Test rmse: 8.666
run id: e5467e29da6f46a9ac9731515a9fa195, 
Val_rmse: 8.119 
Test rmse: 8.658


In [28]:
mlflow.set_tracking_uri(ML_FLOW_TRACKING_URI)

In [36]:
run_id = "a6203da436864c7ea7d2ce768f2ec697"
model_uri = f"runs:/{run_id}/models"
print(model_uri)
mlflow.register_model(model_uri=model_uri, name="chicago-taxi-prediction-model")

Registered model 'chicago-taxi-prediction-model' already exists. Creating a new version of this model...
2023/08/18 17:00:33 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: chicago-taxi-prediction-model, version 4


runs:/a6203da436864c7ea7d2ce768f2ec697/models


Created version '4' of model 'chicago-taxi-prediction-model'.


<ModelVersion: aliases=[], creation_timestamp=1692392433395, current_stage='None', description='', last_updated_timestamp=1692392433395, name='chicago-taxi-prediction-model', run_id='a6203da436864c7ea7d2ce768f2ec697', run_link='', source='s3://mlflow-artifact-remote-chicago-taxi-prediction/1/a6203da436864c7ea7d2ce768f2ec697/artifacts/models', status='READY', status_message='', tags={}, user_id='', version='4'>

In [40]:
model_name = 'chicago-taxi-prediction-model'
latest_version = client.get_latest_versions(name=model_name)
for version in latest_version:
    print(f"version: {version.version}, status: {version.current_stage}")

version: 1, status: Staging
version: 3, status: None
version: 4, status: Production


In [38]:
client.transition_model_version_stage(
    name=model_name,
    version=4,
    stage="production",
    archive_existing_versions=False
)

<ModelVersion: aliases=[], creation_timestamp=1692392433395, current_stage='Production', description='', last_updated_timestamp=1692392470411, name='chicago-taxi-prediction-model', run_id='a6203da436864c7ea7d2ce768f2ec697', run_link='', source='s3://mlflow-artifact-remote-chicago-taxi-prediction/1/a6203da436864c7ea7d2ce768f2ec697/artifacts/models', status='READY', status_message='', tags={}, user_id='', version='4'>

In [39]:
from datetime import datetime
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
client.update_model_version(
    name=model_name,
    version=4,
    description=f"Moved version 4 is selected for production for this {model_name} at {date}",
)

<ModelVersion: aliases=[], creation_timestamp=1692392433395, current_stage='Production', description=('Moved version 4 is selected for production for this '
 'chicago-taxi-prediction-model at 2023-08-18 17:02:11'), last_updated_timestamp=1692392531600, name='chicago-taxi-prediction-model', run_id='a6203da436864c7ea7d2ce768f2ec697', run_link='', source='s3://mlflow-artifact-remote-chicago-taxi-prediction/1/a6203da436864c7ea7d2ce768f2ec697/artifacts/models', status='READY', status_message='', tags={}, user_id='', version='4'>