In [2]:
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.preprocessing import LabelEncoder
import json
from pandas.io.json import json_normalize
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV
import xgboost as xgb
from sklearn.model_selection import train_test_split
import mlflow
import mlflow.xgboost
import zenml
# import databricks.koalas as ks
# from databricks.mlflow.client import MlflowClient
# from databricks.mlflow.types import Schema
from sklearn.metrics import mean_absolute_error, mean_squared_error


In [11]:
# Load data

with open('C:/Users/vijaya.sekhar/Downloads/download/download/EVRI/EVRI/data/evri-nonprod-dgw-firehose-tracking-5-2022-05-05-06 (early morning sample).json') as f:
        ps_data = json.load(f)
ps_df = pd.json_normalize(ps_data)
df = ps_df[['originalParcelCreationTime',
   'trackingEvent.eventCategory',
  'trackingEvent.client.clientId',
   'trackingEvent.preadviceDetail.deliveryDetails.parcelId',
   'trackingEvent.preadviceDetail.parcelDetails.numberOfItems',
   'trackingEvent.preadviceDetail.parcelType.parcelTypeId']].copy()
        
df["originalParcelCreationTime"]= pd.to_datetime(df["originalParcelCreationTime"])
df['hour'] = df["originalParcelCreationTime"].dt.hour
df['weekdayind'] = np.where(df['originalParcelCreationTime'].dt.dayofweek.isin([5,6]), 1, 0)
b = [0,4,8,12,16,20,24]
l = ['Late_Night', 'Early_Morning','Morning','Noon','Eve','Night']
df['sessionofday'] = pd.cut(df['hour'], bins=b, labels=l, include_lowest=True)
    
    
le = LabelEncoder()
df['trackingEvent.eventCategory'] = le.fit_transform(df['trackingEvent.eventCategory'])
df['sessionofday'] = le.fit_transform(df['sessionofday'])
    
features = df[['trackingEvent.eventCategory', 'trackingEvent.client.clientId',
       'trackingEvent.preadviceDetail.deliveryDetails.parcelId','trackingEvent.preadviceDetail.parcelType.parcelTypeId','hour','sessionofday','weekdayind']]
target = df[['trackingEvent.preadviceDetail.parcelDetails.numberOfItems']]
    
# Split data into train and test sets
train_size = int(len(df)*0.8)
X_train,y_train = features[:train_size],target[:train_size]
X_test,y_test = features[train_size:],target[train_size:]





In [12]:
with mlflow.start_run(run_name="xgboost-regressor"):
    # Log XGBoost parameters
    params = {
     'objective': 'reg:squarederror',
     'max_depth': 3,
    'learning_rate': 0.1,
     'subsample': 0.8,
    'colsample_bytree': 0.8,
    'n_estimators': 100
    }

    mlflow.log_params(params)

    # Train XGBoost model
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)
    model = xgb.train(
        params,
        dtrain,
        evals=[(dtest, "eval")],
        verbose_eval=False,
    )

    # Log XGBoost model
    mlflow.xgboost.log_model(model, "model")


Parameters: { "n_estimators" } are not used.





In [15]:
# Get MLflow experiment and run IDs
# experiment_id = mlflow.active_experiment_id()
# run_id = mlflow.active_run().info.run_id
# save trained model
model_path = "model"
mlflow.sklearn.save_model(model, model_path)
mlflow.log_artifact(model_path)

In [None]:
# # Import required libraries
# import mlflow

# # Define the model name and stage
# model_name = "XGBoost Regressor Model"
# model_stage = "Production"

# # Register the model in MLflow model registry
# with mlflow.start_run(run_name="MLflow Model Registry Run"):
#     model_uri = "runs:/{}/{}".format(mlflow.active_run().info.run_id, model_name)
#     mv = mlflow.register_model(model_uri=model_uri, name=model_name, stage=model_stage)
#     print("Model version: {}".format(mv.version))


In [None]:
# Define ZenML pipeline
@zenml.Pipeline(name="xgboost-regressor-pipeline")
def xgboost_regressor_pipeline():
    # Define input data
    input_data_step = zenml.steps.InputData(
    with open('C:/Users/vijaya.sekhar/Downloads/download/download/EVRI/EVRI/data/evri-nonprod-dgw-firehose-tracking-5-2022-05-05-06 (early morning sample).json') as f:
        ps_data = json.load(f)
    ps_df = pd.json_normalize(ps_data)
    )

    # Define split data step
    split_data_step = zenml.steps.SplitData(
        split_map={
            "train": {
                "split_expression": "lambda df: df.sample(frac=0.8, random_state=1234)",
                "is_training": True,
            },
            "eval": {
                "split_expression": "lambda df: df.drop(list(filter(lambda col: col != 'target', df.columns)), axis=1, errors='ignore')",
                "is_training": False,
            },
        }
    )

    # Define xgboost step
    xgboost_step = zenml.steps.XGBoostRegressor(
        max_depth=6, eta=0.1, num_round=100, mlflow=True
    )

    # Define evaluation step
    evaluation_step = zenml.steps.Evaluator(
        metrics=[mean_absolute_error, mean_squared_error],
        model_name=xgboost_step.name,
    )

    # Define output step
    output_step = zenml.steps.Output(
        mode="return",
        schema=Schema([("mae", float), ("mse", float)]),
    )

    # Define pipeline
    return zenml.Workflow(
        input_data=input_data_step,
        steps=[split_data_step, xgboost_step, evaluation_step, output_step],
    )


In [None]:
# Run ZenML pipeline
pipeline = xgboost_regressor_pipeline().with_mlflow_tracking(
    experiment_name="xgboost-regressor-pipeline"
)
pipeline.run()


In [None]:
# Deploy model to Databricks
dbutils.fs.mkdirs("/mnt/models/xgboost-regressor")
ks.to_csv(test, "dbfs:/mnt/models/xgboost-regressor/test")
dbutils.fs.mkdirs("/mnt/models/xgboost-regressor/model")
mlflow_xgboost = "runs:/{}/model".format(run_id)
model_dbfs_path = "/mnt/models/xgboost-regressor/model"
MlflowClient().download_artifacts(mlflow_xgboost, "xgb.model", model_dbfs_path)

# Load model from DBFS
model_path = "/dbfs/mnt/models/xgboost-regressor/model/xgb.model"
model = mlflow.xgboost.load_model(model_path)

# Predict with loaded model
test_data = xgb.DMatrix(test.drop("target", axis=1
