In [1]:
!python -V

Python 3.9.12


In [73]:
import os
import mlflow
import pickle
import zipfile
import datetime
import requests
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from typing import List
from hyperopt.pyll import scope
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression, Lasso, Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error

In [3]:
!pip freeze | grep scikit-learn

scikit-learn==1.1.1


In [80]:
MLFLOW_TRACKING_URI = os.getenv('MLFLOW_TRACKING_URI', 'http://ec2-54-79-228-176.ap-southeast-2.compute.amazonaws.com:5000/')
EXPERIMENT_NAME = os.getenv('EXPERIMENT_NAME', 'bikeshare-ride-duration-prediction')
MODEL_NAME = os.getenv('MODEL_NAME', 'bikeshare-ride-duration-regressor')
TRAINING_DATA_PATH = os.getenv('TRAINING_DATA_PATH', 'https://s3.amazonaws.com/capitalbikeshare-data/202204-capitalbikeshare-tripdata.zip')
TEST_DATA_PATH = os.getenv('TEST_DATA_PATH', 'https://s3.amazonaws.com/capitalbikeshare-data/202205-capitalbikeshare-tripdata.zip')

In [4]:
def read_data(url: str):
    """
    Capital Bikeshare datasets are zipped
    We need to download then extract the csv
    """
    zip_path = url.split('/')[-1] 
    file_name = zip_path.split('.')[0] + '.csv'
    save_path = f'./datasets/{zip_path}'

    req = requests.get(url)

    with open(save_path, 'wb') as f_out:
        f_out.write(req.content)

    with zipfile.ZipFile(save_path) as z:
        with z.open(file_name) as f:
            df = pd.read_csv(f, parse_dates=True)
            
    categorical_cols = ['rideable_type', 'start_station_id', 'end_station_id']
    date_cols = ['started_at', 'ended_at']
    
    df[categorical_cols] = df[categorical_cols].astype(str)
    df[date_cols] = df[date_cols].apply(pd.to_datetime, format='%Y/%m/%d %H:%M:%S')
    
    df['duration'] = df['ended_at'] - df['started_at']
    df['duration'] = df['duration'].apply(lambda x: round(x.total_seconds() / 60, 0))
    df['start_end'] = df['start_station_id'] + '_' + df['end_station_id']

    df = df[df['duration'] <= 120]

    categorical_cols = ['rideable_type', 'start_end']
    target = 'duration'
    
    return df, categorical_cols, target

In [5]:
df, categorical_cols, target = read_data(TRAINING_DATA_PATH)

In [6]:
df.head()

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,duration,start_end
0,8F5ADBABCB4EBE01,classic_bike,2022-04-15 10:09:53,2022-04-15 10:16:12,Maine Ave & 9th St SW,31646.0,Smithsonian-National Mall / Jefferson Dr & 12t...,31248.0,38.88044,-77.025236,38.888774,-77.028694,member,6.0,31646.0_31248.0
2,01BF0E3746A32678,classic_bike,2022-04-20 19:35:59,2022-04-20 19:41:04,11th & V st NW,31332.0,14th & Belmont St NW,31119.0,38.918199,-77.027171,38.921074,-77.031887,member,5.0,31332.0_31119.0
3,94BD7902E9889076,docked_bike,2022-04-15 17:23:21,2022-04-15 17:48:35,14th & D St NW / Ronald Reagan Building,31231.0,15th & W St NW,31125.0,38.894514,-77.031617,38.919019,-77.034449,casual,25.0,31231.0_31125.0
4,2CA1C29600E5F00A,classic_bike,2022-04-18 09:04:07,2022-04-18 09:05:12,11th & V st NW,31332.0,11th & V st NW,31332.0,38.918199,-77.027171,38.918199,-77.027171,member,1.0,31332.0_31332.0
5,25DFD58A2EE108E1,classic_bike,2022-04-20 10:29:37,2022-04-20 10:54:07,11th & V st NW,31332.0,Montello Ave & Holbrook Terr NE,31524.0,38.918199,-77.027171,38.907444,-76.986813,member,24.0,31332.0_31524.0


In [7]:
def create_train_val_sets(df: pd.DataFrame, categorical_cols: List, target: str):
    dv = DictVectorizer()
    dicts = df[categorical_cols].to_dict(orient='records')
    
    x = dv.fit_transform(dicts)
    y = df[target].values

    x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.2, shuffle=True, random_state=42)
    print(x_train.shape, x_val.shape, y_train.shape, y_val.shape)
    print(dv)

    return x_train, x_val, y_train, y_val, dv

In [8]:
x_train, x_val, y_train, y_val, dv = create_train_val_sets(df, categorical_cols, target)

(243756, 54362) (60939, 54362) (243756,) (60939,)
DictVectorizer()


In [9]:
lr = Ridge().fit(x_train, y_train)
y_pred = lr.predict(x_val)
mean_squared_error(y_val, y_pred, squared=False)

14.405431269954295

In [39]:
def model_search(x_train, y_train, x_val, y_val, num_trials=10):
    
    def objective(params):
        
        with mlflow.start_run():
            mlflow.set_tag('model', 'Ridge')
            mlflow.log_params(params)
            
            lr = Ridge(**params)
            lr.fit(x_train, y_train)
            y_pred = lr.predict(x_val)
            rmse = mean_squared_error(y_val, y_pred, squared=False)
            mlflow.log_metric('RMSE', rmse)

        return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'alpha': hp.uniform('alpha', 0.01, 1)
    }

    rstate=np.random.default_rng(0)
    best_result = fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials(),
        rstate=rstate
    )

    return best_result

In [75]:
def train_best_model(x_train, y_train, x_val, y_val, dv, best_result):
    
    with mlflow.start_run() as run:
        
        print(f"Best params: {best_result}")
        mlflow.log_params(best_result)
        lr = Ridge(**best_result)
        lr.fit(x_train, y_train)
        y_pred = lr.predict(x_val)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        print(f"RMSE: {rmse}")
        mlflow.log_metric('RMSE', rmse)

        with open('preprocessor.bin', 'wb') as f_out:
            pickle.dump(dv, f_out)
        mlflow.log_artifact('preprocessor.bin', artifact_path='preprocessor')
        mlflow.sklearn.log_model(lr, artifact_path='model')

        run_id = run.info.run_id
        print(f'Run ID: {run_id}')
        model_uri = f'runs:/{run_id}/model'
        mlflow.register_model(model_uri=model_uri, name=MODEL_NAME)

    return run_id

In [63]:
def transition_model_stage(client, model_name, model_version, stage):
    client.transition_model_version_stage(
        name=model_name,
        version=model_version,
        stage=stage,
        archive_existing_versions=False
    )

    date = datetime.date.today()
    client.update_model_version(
        name=model_name,
        version=model_version,
        description=f'Version {model_version} of {model_name} was transitioned to {stage} on {date}.'
    )

In [67]:
def train_model(client, data_path=TRAINING_DATA_PATH):
    
    df, categorical_cols, target = read_data(data_path)
    x_train, x_val, y_train, y_val, dv = create_train_val_sets(df, categorical_cols, target)

    best_result = model_search(x_train, y_train, x_val, y_val)
    train_best_model(x_train, y_train, x_val, y_val, dv, best_result)

    model_version = client.get_latest_versions(MODEL_NAME)[-1].version
    transition_model_stage(client, MODEL_NAME, model_version, 'Staging')

In [76]:
def get_model_preprocessor_paths(client, run_id: str):
    experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
    artifact_location = experiment.artifact_location
    model_path = f'{artifact_location}/{run_id}/artifacts/model'
    preprocessor_path = f'{artifact_location}/{run_id}/artifacts/preprocessor/preprocessor.bin'

    return model_path, preprocessor_path

In [None]:
def load_model_preprocessor(client, run_id: str):
    model_path, preprocessor_path = get_model_preprocessor_paths(run_id)
    model = mlflow.pyfunc.load_model(model_path)
    preprocessor_artifact = mlflow.artifacts.download_artifacts(preprocessor_path)
    
    with open(preprocessor_artifact, 'rb') as f_in:
        preprocessor = pickle.load(f_in)

    return model, preprocessor

In [None]:
def test_model(client, model_name, data_path=TEST_DATA_PATH):

    df, categorical_cols, target = read_data(data_path)
    dv = mlflow
    production_model = client.
    y_pred = lr.predict(x_val)
    mean_squared_error(y_val, y_pred, squared=False)

In [81]:

client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
client.list_experiments()

[<Experiment: artifact_location='s3://mlflow-artifacts-remote-2/0', experiment_id='0', lifecycle_stage='active', name='Default', tags={}>,
 <Experiment: artifact_location='s3://mlflow-artifacts-remote-2/1', experiment_id='1', lifecycle_stage='active', name='bikeshare-ride-duration-prediction', tags={}>]

In [82]:
client.search_runs([1])

[<Run: data=<RunData: metrics={'RMSE': 14.390913317033252}, params={'alpha': '0.6854409845387625'}, tags={'mlflow.log-model.history': '[{"run_id": "b9ea23fdd14043a8827497b800da057f", '
                              '"artifact_path": "model", "utc_time_created": '
                              '"2022-09-08 11:47:20.501657", "flavors": '
                              '{"python_function": {"model_path": "model.pkl", '
                              '"loader_module": "mlflow.sklearn", '
                              '"python_version": "3.9.12", "env": '
                              '"conda.yaml"}, "sklearn": {"pickled_model": '
                              '"model.pkl", "sklearn_version": "1.0.2", '
                              '"serialization_format": "cloudpickle", "code": '
                              'null}}, "model_uuid": '
                              '"821d12e1330941298edfc564a4fdea6e", '
                              '"mlflow_version": "1.28.0"}]',
  'mlflow.source.git.commit'

In [72]:
experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
experiment.artifact_location

's3://mlflow-artifacts-remote-2/1'

In [31]:
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment('bikeshare-ride-duration-prediction')

<Experiment: artifact_location='s3://mlflow-artifacts-remote-2/1', experiment_id='1', lifecycle_stage='active', name='bikeshare-ride-duration-prediction', tags={}>

In [54]:
apply_model(client)

(243756, 54362) (60939, 54362) (243756,) (60939,)
DictVectorizer()
100%|██████████| 10/10 [00:11<00:00,  1.14s/trial, best loss: 14.390913317033252]
Best params: {'alpha': 0.6854409845387625}
RMSE: 14.390913317033252
Run ID: b9ea23fdd14043a8827497b800da057f


Registered model 'bikeshare-ride-duration-regressor' already exists. Creating a new version of this model...
2022/09/08 21:47:23 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: bikeshare-ride-duration-regressor, version 3
Created version '3' of model 'bikeshare-ride-duration-regressor'.


In [65]:
model_version = client.get_latest_versions(MODEL_NAME)[-1].version
transition_model_stage(client, MODEL_NAME, model_version, 'Production')

In [92]:
client.get_latest_versions(MODEL_NAME, ['Production'])[0]

<ModelVersion: creation_timestamp=1662982796229, current_stage='Production', description='', last_updated_timestamp=1662982975334, name='bikeshare-ride-duration-regressor', run_id='ea139ca8e97f457d911ffb831f69ab6a', run_link='', source='s3://mlflow-artifacts-remote-2/1/ea139ca8e97f457d911ffb831f69ab6a/artifacts/model', status='READY', status_message='', tags={'run_id': 'ea139ca8e97f457d911ffb831f69ab6a'}, user_id='', version='12'>