In [1]:
!pwd

/home/tungdao/tung/mlopsvn/code/mlops-crash-course-code/training_pipeline/nbs


In [2]:
import pandas as pd
import fastparquet
from pathlib import Path
import numpy as np

random_seed = 17
np.random.seed(random_seed)

## Load data

In [3]:
DATA_DIR = Path("./data")
DATA_PATH = DATA_DIR / "exp_driver_stats.parquet"
LABEL_PATH = DATA_DIR / "exp_driver_orders.csv"
if not DATA_PATH.is_file():
    raise Exception("DATA_PATH not found")
if not LABEL_PATH.is_file():
    raise Exception("LABEL_PATH not found")

In [4]:
df_orig = pd.read_parquet(DATA_PATH, engine='fastparquet')
df_orig

Unnamed: 0,datetime,driver_id,conv_rate,acc_rate,avg_daily_trips,created
0,2021-07-13 11:00:00+00:00,1005,0.373837,0.154890,498,2021-07-28 11:08:04.802
1,2021-07-13 12:00:00+00:00,1005,0.571627,0.643958,656,2021-07-28 11:08:04.802
2,2021-07-13 13:00:00+00:00,1005,0.399909,0.993888,722,2021-07-28 11:08:04.802
3,2021-07-13 14:00:00+00:00,1005,0.967468,0.788458,424,2021-07-28 11:08:04.802
4,2021-07-13 15:00:00+00:00,1005,0.024679,0.956064,569,2021-07-28 11:08:04.802
...,...,...,...,...,...,...
1802,2021-07-28 09:00:00+00:00,1001,0.089418,0.311234,485,2021-07-28 11:08:04.802
1803,2021-07-28 10:00:00+00:00,1001,0.222534,0.927691,114,2021-07-28 11:08:04.802
1804,2021-04-12 07:00:00+00:00,1001,0.175219,0.761434,385,2021-07-28 11:08:04.802
902,2021-07-20 23:00:00+00:00,1003,0.025968,0.109748,55,2021-07-28 11:08:04.802


In [5]:
label_orig = pd.read_csv(LABEL_PATH, sep="\t")
label_orig

Unnamed: 0,event_timestamp,driver_id,trip_completed
0,2021-04-16 20:29:28+00:00,1001,1
1,2021-04-17 04:29:28+00:00,1002,0
2,2021-04-17 12:29:28+00:00,1003,0
3,2021-04-17 20:29:28+00:00,1001,1
4,2021-04-18 04:29:28+00:00,1002,0
5,2021-04-18 12:29:28+00:00,1003,0
6,2021-04-18 20:29:28+00:00,1001,1
7,2021-04-19 04:29:28+00:00,1002,0
8,2021-04-19 12:29:28+00:00,1003,0
9,2021-04-19 20:29:28+00:00,1004,1


### Format timestamp

In [6]:
label_orig["event_timestamp"] = pd.to_datetime(label_orig["event_timestamp"])
label_orig

Unnamed: 0,event_timestamp,driver_id,trip_completed
0,2021-04-16 20:29:28+00:00,1001,1
1,2021-04-17 04:29:28+00:00,1002,0
2,2021-04-17 12:29:28+00:00,1003,0
3,2021-04-17 20:29:28+00:00,1001,1
4,2021-04-18 04:29:28+00:00,1002,0
5,2021-04-18 12:29:28+00:00,1003,0
6,2021-04-18 20:29:28+00:00,1001,1
7,2021-04-19 04:29:28+00:00,1002,0
8,2021-04-19 12:29:28+00:00,1003,0
9,2021-04-19 20:29:28+00:00,1004,1


## Training

### Merge features with labels

In [7]:
groups = df_orig.groupby('driver_id')

def proc_row(row):
    global data_df
    end_time = row['event_timestamp']
    driver_id = row['driver_id']
    grp_rows = groups.get_group(driver_id)

    # get latest record based on event_timestamp
    grp_rows = grp_rows[grp_rows['datetime'] <= end_time]
    grp_rows = grp_rows.sort_values('datetime')
    grp_rows = grp_rows.iloc[-1]
    
    # add columns
    grp_rows['event_timestamp'] = end_time
    grp_rows['trip_completed'] = row['trip_completed']

    # to Series
    return grp_rows.squeeze(axis=0)

data_df = label_orig.apply(proc_row, axis=1)
data_df

Unnamed: 0,datetime,driver_id,conv_rate,acc_rate,avg_daily_trips,created,event_timestamp,trip_completed
0,2021-04-12 07:00:00+00:00,1001,0.175219,0.761434,385,2021-07-28 11:08:04.802,2021-04-16 20:29:28+00:00,1
1,2021-04-12 07:00:00+00:00,1002,0.312347,0.481786,810,2021-07-28 11:08:04.802,2021-04-17 04:29:28+00:00,0
2,2021-04-12 07:00:00+00:00,1003,0.736727,0.936667,939,2021-07-28 11:08:04.802,2021-04-17 12:29:28+00:00,0
3,2021-04-12 07:00:00+00:00,1001,0.175219,0.761434,385,2021-07-28 11:08:04.802,2021-04-17 20:29:28+00:00,1
4,2021-04-12 07:00:00+00:00,1002,0.312347,0.481786,810,2021-07-28 11:08:04.802,2021-04-18 04:29:28+00:00,0
5,2021-04-12 07:00:00+00:00,1003,0.736727,0.936667,939,2021-07-28 11:08:04.802,2021-04-18 12:29:28+00:00,0
6,2021-04-12 07:00:00+00:00,1001,0.175219,0.761434,385,2021-07-28 11:08:04.802,2021-04-18 20:29:28+00:00,1
7,2021-04-12 07:00:00+00:00,1002,0.312347,0.481786,810,2021-07-28 11:08:04.802,2021-04-19 04:29:28+00:00,0
8,2021-04-12 07:00:00+00:00,1003,0.736727,0.936667,939,2021-07-28 11:08:04.802,2021-04-19 12:29:28+00:00,0
9,2021-04-12 07:00:00+00:00,1004,0.094609,0.151163,166,2021-07-28 11:08:04.802,2021-04-19 20:29:28+00:00,1


In [8]:
data_df = data_df[data_df.columns. \
    drop("datetime"). \
    drop("driver_id"). \
    drop("created"). \
    drop("event_timestamp")]
data_df

Unnamed: 0,conv_rate,acc_rate,avg_daily_trips,trip_completed
0,0.175219,0.761434,385,1
1,0.312347,0.481786,810,0
2,0.736727,0.936667,939,0
3,0.175219,0.761434,385,1
4,0.312347,0.481786,810,0
5,0.736727,0.936667,939,0
6,0.175219,0.761434,385,1
7,0.312347,0.481786,810,0
8,0.736727,0.936667,939,0
9,0.094609,0.151163,166,1


### Split data

In [9]:
from sklearn.linear_model import ElasticNet
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [10]:
def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

In [11]:
selected_ft = ["conv_rate", "acc_rate", "avg_daily_trips"]
TARGET_COL = "trip_completed"
TEST_SIZE = 0.2

train, test = train_test_split(data_df, test_size=TEST_SIZE, random_state=random_seed)
train_x = train.drop([TARGET_COL], axis=1)[selected_ft]
test_x = test.drop([TARGET_COL], axis=1)[selected_ft]
train_y = train[[TARGET_COL]]
test_y = test[[TARGET_COL]]
train_x.shape, train_y.shape, test_x.shape, test_y.shape

((8, 3), (8, 1), (2, 3), (2, 1))

### Training and evaluation

In [12]:
from mlflow.tracking import MlflowClient
import mlflow

In [18]:
def yield_artifacts(run_id, path=None):
    """Yield all artifacts in the specified run"""
    client = MlflowClient()
    for item in client.list_artifacts(run_id, path):
        if item.is_dir:
            yield from yield_artifacts(run_id, item.path)
        else:
            yield item.path

def fetch_logged_data(run_id):
    """Fetch params, metrics, tags, and artifacts in the specified run"""
    client = MlflowClient()
    data = client.get_run(run_id).data
    # Exclude system tags: https://www.mlflow.org/docs/latest/tracking.html#system-tags
    tags = {k: v for k, v in data.tags.items() if not k.startswith("mlflow.")}
    artifacts = list(yield_artifacts(run_id))
    return {
        "params": data.params,
        "metrics": data.metrics,
        "tags": tags,
        "artifacts": artifacts,
    }

MLFLOW_TRACKING_URI = "http://localhost:5000"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment("elastic-net")
print((mlflow.get_tracking_uri(), mlflow.get_artifact_uri()))
mlflow.sklearn.autolog()

('http://localhost:5000', 'mlflow-artifacts:/1/6c19bd6594594f1a88d0a5ab30076f58/artifacts')


In [19]:
ALPHA = 0.5
L1_RATIO = 0.1

model = ElasticNet(alpha=ALPHA, l1_ratio=L1_RATIO, random_state=random_seed)
model.fit(train_x, train_y)

predicted_qualities = model.predict(test_x)
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (ALPHA, L1_RATIO))
print("RMSE: %s" % rmse)
print("MAE: %s" % mae)
print("R2: %s" % r2)



Elasticnet model (alpha=0.500000, l1_ratio=0.100000):
RMSE: 0.11625695183915705
MAE: 0.10880469049979313
R2: 0.0


In [20]:
import uuid
from mlflow.models.signature import infer_signature

mlflow.set_tag("mlflow.runName", uuid.uuid1())
mlflow.log_param("features", selected_ft)
mlflow.log_param("alpha", ALPHA)
mlflow.log_param("l1_ratio", L1_RATIO)
mlflow.log_metric("testing_rmse", rmse)
mlflow.log_metric("testing_r2", r2)
mlflow.log_metric("testing_mae", mae)
signature = infer_signature(train_x, model.predict(train_x))
mlflow.sklearn.log_model(
    sk_model=model,
    artifact_path="model",
    signature=signature,
)
mlflow.end_run()

run_id = mlflow.last_active_run().info.run_id
print("Logged data and model in run {}".format(run_id))
for key, data in fetch_logged_data(run_id).items():
    print("\n---------- logged {} ----------".format(key))
    print(data)

  inputs = _infer_schema(model_input)


Logged data and model in run 6c19bd6594594f1a88d0a5ab30076f58

---------- logged params ----------
{'alpha': '0.5', 'copy_X': 'True', 'fit_intercept': 'True', 'l1_ratio': '0.1', 'max_iter': '1000', 'normalize': 'deprecated', 'positive': 'False', 'precompute': 'False', 'random_state': '17', 'selection': 'cyclic', 'tol': '0.0001', 'warm_start': 'False', 'features': "['conv_rate', 'acc_rate', 'avg_daily_trips']"}

---------- logged metrics ----------
{'training_mse': 0.020391301867212934, 'training_mae': 0.1339089111891788, 'training_r2_score': 0.9184347925311482, 'training_rmse': 0.14279811576912677, 'training_score': 0.9184347925311482, 'testing_rmse': 0.11625695183915705, 'testing_r2': 0.0, 'testing_mae': 0.10880469049979313}

---------- logged tags ----------
{'estimator_name': 'ElasticNet', 'estimator_class': 'sklearn.linear_model._coordinate_descent.ElasticNet'}

---------- logged artifacts ----------
['model/MLmodel', 'model/conda.yaml', 'model/model.pkl', 'model/python_env.yaml', 

### Import and inference

In [21]:
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
loaded_model

mlflow.pyfunc.loaded_model:
  artifact_path: model
  flavor: mlflow.sklearn
  run_id: 6c19bd6594594f1a88d0a5ab30076f58

In [22]:
predictions = loaded_model.predict(test_x)
predictions

array([ 0.14975854, -0.06785084])