# Config

In [5]:
import warnings
warnings.filterwarnings('ignore')

import os
import subprocess

# Set HADOOP_CONF_DIR environment variable
os.environ['HADOOP_CONF_DIR'] = '/usr/hdp/3.1.4.0-315/hadoop/etc/hadoop'

# Set ARROW_LIBHDFS_DIR environment variable
os.environ['ARROW_LIBHDFS_DIR'] = '/usr/hdp/3.1.4.0-315/hadoop/lib/native/'

# Set CLASSPATH enviroment variable
classpath = subprocess.check_output(['hadoop', 'classpath', '--glob'])
os.environ['CLASSPATH'] = classpath.decode('utf-8')

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import mlflow
from mlflow.models import infer_signature

from pathlib import Path

pd.set_option("display.float_format", "{:.3f}".format)
pd.set_option("display.max_columns", 200)
pd.set_option("display.max_rows", 200)

pd.set_option("max_colwidth", 50)

# Simple ML training flow

In [4]:
# Load the Iris dataset
X, y = datasets.load_iris(return_X_y=True)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Define the model hyperparameters
params = {
    "solver": "lbfgs",
    "max_iter": 1000,
    "multi_class": "auto",
    "random_state": 8888,
}

# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)

# Predict on the test set
y_pred = lr.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)

# MLFlow tracking setup

In [15]:
# Set our tracking server uri for logging
MLFLOW_PATH = "http://10.21.47.72:9898/"
mlflow.set_tracking_uri(MLFLOW_PATH)

# Create a new MLflow Experiment
mlflow.set_experiment("MLflow Sample")

# Start an MLflow run
with mlflow.start_run() as run:
    # Log the hyperparameters
    params.update({
        "n_features": X_train.shape[1],
        "n_classes": len(np.unique(y_train))
    })
    mlflow.log_params(params)

    # Log the loss metric
    metrics = {
        "accuracy": accuracy,
        "precision": precision_score(y_test, y_pred, average='weighted'),
        "recall": recall_score(y_test, y_pred, average='weighted'),
        "f1": f1_score(y_test, y_pred, average='weighted')
    }
    mlflow.log_metrics(metrics)

    # Set a tag that we can use to remind ourselves what this run was for
    mlflow.set_tag("Training Info", "Basic LR model for sample")

    # Infer the model signature
    signature = infer_signature(X_train, lr.predict(X_train))

    # Log the model
    model_info = mlflow.sklearn.log_model(
        sk_model=lr,
        artifact_path="iris_model",
        signature=signature,
        registered_model_name="Sample",
    )

Registered model 'Sample' already exists. Creating a new version of this model...
2025/01/23 10:53:34 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: Sample, version 2
Created version '2' of model 'Sample'.
2025/01/23 10:53:34 INFO mlflow.tracking._tracking_service.client: 🏃 View run secretive-hare-384 at: http://10.21.47.72:9898/#/experiments/890253992700055295/runs/554f48695c704f7f8ca1ae34321b8fd7.
2025/01/23 10:53:34 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://10.21.47.72:9898/#/experiments/890253992700055295.


# Pull model from MLFlow for inference

In [26]:
client = mlflow.tracking.MlflowClient() 

In [28]:
model_info.model_uri

'runs:/554f48695c704f7f8ca1ae34321b8fd7/iris_model'

In [14]:
logged_model = 'runs:/554f48695c704f7f8ca1ae34321b8fd7/iris_model'

# Load model as a PyFuncModel.
loaded_model = mlflow.sklearn.load_model("/home/hieulm8/code/customer-segmentation-prod/mlartifacts/890253992700055295/554f48695c704f7f8ca1ae34321b8fd7/artifacts/iris_model")

In [15]:
loaded_model