In [None]:
import os

import pandas as pd

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

import xgboost as xgb

import mlflow.sklearn
import mlflow.xgboost
import mlflow.pyfunc

from pyspark.sql import SparkSession
from pyspark.sql.functions import struct

# Settings

In [None]:
TRACKING_URI = "http://registry:5000/"
EXPERIMENT = "iSeeFlowers"
MODELS_DIRECTORY = "/home/jovyan/notebooks"

REGISTERED_MODEL_NAME = "iSeeFlowers"

mlflow.set_tracking_uri(TRACKING_URI)
mlflow.set_experiment(EXPERIMENT)

# Dataset

In [None]:
iris = load_iris()
X, Y, feature_names = iris['data'], iris['target'], iris['feature_names']

# 1. OK Model

In [None]:
with mlflow.start_run(run_name="logistic-regression"):
    
    exclude__mask = iris['target'] < 2
    x_train, x_test, y_train, y_test = X[exclude__mask], X[~exclude__mask], Y[exclude__mask], Y[~exclude__mask]

    params = {
        "penalty": "l2", 
        "tol": 0.0001, 
        "C": 1.0,
    }
    
    lr = LogisticRegression(**params)
    lr.fit(x_train, y_train)
    score = lr.score(x_test, y_test)
    
    # Log Metadata
    mlflow.log_metric("score", score)
    mlflow.log_params(params)
    mlflow.set_tags({
        "Team": "FlowerTeam",
        "Project": "Classification"
    })
    
    # Log Model
    mlflow.sklearn.log_model(
        sk_model=lr,
        artifact_path="sklearn-model",
        registered_model_name=REGISTERED_MODEL_NAME,
    )

# 2. Better Model

In [None]:
with mlflow.start_run(run_name="xgb-classifier"):
    
    x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42, shuffle=True)

    params = {
        "penalty": "l2",
    }
    
    lr = LogisticRegression(**params)
    lr.fit(x_train, y_train)
    
    # Log Metadata
    score = lr.score(x_test, y_test)
    mlflow.log_metric("score", score)
    mlflow.log_params(params)
    mlflow.set_tags({
        "Team": "FlowerTeam",
        "Project": "Classification"
    })
    
    # Log Model
    mlflow.sklearn.log_model(
        sk_model=lr,
        artifact_path="xgb-model",
        registered_model_name=REGISTERED_MODEL_NAME,
    )

# Retrieving the Model

## Original Model

In [None]:
stage = "Staging"

model = mlflow.pyfunc.load_model(
    model_uri=f"models:/{REGISTERED_MODEL_NAME}/{stage}"
)

model.predict(x_test) == y_test  # Predictions

## Spark Model

In [None]:
stage = "Production"

# Create Spark Session
spark = SparkSession \
        .builder \
        .appName("MLFlow Demo") \
        .getOrCreate()

# Generate Model UDF
predict_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{REGISTERED_MODEL_NAME}/{stage}")

# Spark DataFrame
df = spark.createDataFrame(pd.DataFrame(x_test, columns=feature_names))

# Predict
df = df.withColumn("prediction", predict_udf(struct("*")))

# Collect
df.collect()