In [None]:
## With Hadoop hdfs
# remote (on docker) mlflow-server
# remote(on docker) postgresql for backend-store
# remote(on docker) hdfs for artifact-store. But Hadoop will be installed on locally because pyarrow needs to some hadoop file.

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import pandas as pd

def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
    df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    return df

In [None]:
df_train = read_dataframe("./data/green_tripdata_2022-01.parquet")
df_val = read_dataframe("./data/green_tripdata_2022-02.parquet")

In [None]:
from sklearn.feature_extraction import DictVectorizer

df_train['PU_DO'] = df_train['PULocationID'] + '_' + df_train['DOLocationID']
df_val['PU_DO'] = df_val['PULocationID'] + '_' + df_val['DOLocationID']


categorical = ['PU_DO'] #'PULocationID', 'DOLocationID']
numerical = ['trip_distance']

dv = DictVectorizer()

train_dicts = df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)

val_dicts = df_val[categorical + numerical].to_dict(orient='records')
X_val = dv.transform(val_dicts)

In [None]:
target = 'duration'
y_train = df_train[target].values
y_val = df_val[target].values

In [None]:
import os

os.environ['HADOOP_HOME'] = '/home/hdoop/hadoop'
os.environ['ARROW_LIBHDFS_DIR'] = '/home/hdoop/hadoop/lib/native'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

import mlflow

mlflow.set_tracking_uri("http://localhost:5000")

print(mlflow.list_experiments())

mlflow.set_experiment("Mlflow-Xgboost")

In [None]:
import xgboost as xgb

from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope

from sklearn.metrics import mean_squared_error

train = xgb.DMatrix(X_train, label = y_train)
valid = xgb.DMatrix(X_val, label = y_val)

def objective(params):
    with mlflow.start_run():
        mlflow.set_tag("model","xgboost") ## Set Tag
        mlflow.log_params(params) ## Log Param
        booster = xgb.train(
            params = params,
            dtrain = train,
            num_boost_round = 10,
            evals = [(valid, "validation")],
            early_stopping_rounds = 3
        )
        y_pred = booster.predict(valid)
        rmse = mean_squared_error(y_val, y_pred, squared = False)
        mlflow.log_metric("rmse", rmse) ## Log Metric

        mlflow.xgboost.log_model(booster, artifact_path='Models') ## Log Model

    return {"loss": rmse, "status": STATUS_OK}

search_space = {
    "max_depth": scope.int(hp.quniform("max_depth", 4, 100, 1)),
    "learning_rate": hp.loguniform("learning_rate", -3, 0),
    "reg_alpha": hp.loguniform("reg_alpha", -5, -1),
    "reg_lambda": hp.loguniform("reg_lambda", -6, -1),
    "min_child_weight": hp.loguniform("min_child_weight", -1, 3),
    "objective": "reg:linear",
    "seed": 55,
}

best_result = fmin(
    fn= objective,
    space=search_space,
    algo= tpe.suggest,
    max_evals= 20,
    trials= Trials()
)

In [41]:
## Load Model

model_id = "81f4efd12151425189ebc9653d8bdeb9"
logged_model = f"runs:/{model_id}/Models"

# There are two ways for load model

# loaded_model = mlflow.pyfunc.load_model(logged_model)
## There is an error because of Xgboost.DMatrix
# print(loaded_model.predict(valid)[:10]) 

xgboost_model = mlflow.xgboost.load_model(logged_model)
print(xgboost_model.predict(valid)[:10]) ## Object of model

2023-02-22 15:27:48,806 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


[ 6.578524   3.3540823 24.625639  37.48502   27.982061   9.45252
 22.10602    3.8870816 14.812214   5.626942 ]


In [None]:
## Autolog

mlflow.xgboost.autolog()

train = xgb.DMatrix(X_train, label = y_train)
valid = xgb.DMatrix(X_val, label = y_val)

params = {
    'learning_rate': 0.5708688941546396,
    'max_depth': 16,
    'min_child_weight': 0.3907249417416627,
    'objective': 'reg:linear',
    'reg_alpha': 0.016943269004007114,
    'reg_lambda': 0.1796292504048719,
    'seed':	55
}

booster = xgb.train(
            params = params,
            dtrain = train,
            num_boost_round = 500,
            evals = [(valid, "validation")],
            early_stopping_rounds = 50
        )

In [None]:
## AWS

# Remote ec2 server for mlflow-server
# Remote Postgresql for backends-store
# Remote S3 for artifacts-store

In [None]:
import mlflow

TRACKING_SERVER_HOST = "ec2-44-203-192-141.compute-1.amazonaws.com"
mlflow.set_tracking_uri(f'http://{TRACKING_SERVER_HOST}:5000')

f'tracking URI: {mlflow.get_tracking_uri()}'

In [None]:
mlflow.list_experiments()

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score

mlflow.set_experiment("S3-Postgresql-Remote-AWS")

with mlflow.start_run():
    print(mlflow.get_artifact_uri())

    x, y = load_iris(return_X_y=True)

    params = {"C": 0.1, "random_state": 42}
    # mlflow.log_param(params)

    lr = LogisticRegression(**params)
    lr = lr.fit(x, y)	
	
    y_pred = lr.predict(x)
    
    mlflow.log_metric("accuracy", accuracy_score(y, y_pred))
	
    mlflow.sklearn.log_model(lr, artifact_path='Models')
    print(f'default artifact URI: {mlflow.get_artifact_uri()}')

In [None]:
mlflow.list_experiments()