## Part 1: data preprocessing and feature engineering

In [0]:
source = "abfss://raw@cloudinfrastg.dfs.core.windows.net/"
red_wine = "winequality-red.csv"
white_wine = "winequality-white.csv"

In [0]:
import pandas as pd

red_wine_df = spark.read.csv(source + red_wine, header=True, inferSchema=True,sep=";" )   
white_wine_df = spark.read.csv(source + white_wine, header=True, inferSchema=True,sep=";" )

red_wine_df.display()

red_wine_pd = red_wine_df.toPandas()
white_wine_pd = white_wine_df.toPandas()

In [0]:
red_wine_pd["is_red"] = 1
white_wine_pd["is_red"] = 0

In [0]:
red_wine_pd

In [0]:
white_wine_pd

In [0]:
data = pd.concat([red_wine_pd, white_wine_pd], axis=0)
data

In [0]:
data.rename(columns=lambda x: x.replace(" ", "_"), inplace=True)
data

In [0]:
data.columns

In [0]:
data.dtypes

In [0]:
#missing values
data.count()

In [0]:
data.corr()

In [0]:
data.describe()

In [0]:
import seaborn as sns
sns.displot(data.quality, kde=False, bins=10, color='red', alpha=0.5)

In [0]:
#replace values of quality with 1 if quality >= 7 and 0 otherwise
high_quality = (data.quality >= 7).astype(int)
data.quality = high_quality


In [0]:
print(high_quality.value_counts())

In [0]:
data

In [0]:
data.columns

In [0]:
data.isna().any()

In [0]:
#find correlation between features and quality
import matplotlib.pyplot as plt

dims = (3,4)
f, axes = plt.subplots(dims[0], dims[1], figsize=(25,20))
axis_i, axis_j = 0, 0
for col in data.columns:
    if col=='is_red' or col=='quality':
        continue
    sns.boxplot(x=high_quality, y=data[col], ax=axes[axis_i, axis_j])
    axis_j += 1
    if axis_j == dims[1]:
        axis_i += 1
        axis_j = 0

## Training the dataset

In [0]:
from sklearn.model_selection import train_test_split
X=data.drop(['quality'], axis=1)
y=data['quality']

#split data into training and testing sets
X_train, X_remain, y_train, y_remain = train_test_split(X, y, test_size=0.6, random_state=42)

#split the remaining data equally into validation and test
X_val, X_test, y_val, y_test = train_test_split(X_remain, y_remain, test_size=0.5, random_state=42)

## Part 2: building the model

In [0]:
import mlflow
import mlflow.sklearn
import mlflow.pyfunc
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time

In [0]:
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, model):
        self.model = model
    def predict (self, context, model_input):
        return self.model.predict_proba(model_input)[:,1]

In [0]:
with mlflow.start_run(run_name='untuned_random_forest') as run:
    n_estimators = 10
    model = RandomForestClassifier(n_estimators=n_estimators, random_state=123)
    model.fit(X_train, y_train)

    #predict_proba returns [prob_neg, prob_pos], so slice the output with [;,1]
    predictions_test = model.predict_proba(X_test)[:,1]
    auc_score = roc_auc_score(y_test, predictions_test)
    mlflow.log_param('n_estimators', n_estimators)

    #use are under the roc curve as a metric
    mlflow.log_metric('auc', auc_score)
    wrappedModel = SklearnModelWrapper(model)

    #log model with a signature that defines the schema inputs and outputs. when model is deployed this signature will be used to validate inputs
    signature = infer_signature(X_train, wrappedModel.predict(None, X_train))

    #mlflow contains utilities to create a conda environment used to serve models. The dependencies are added to conda.yaml and logged with the model
    conda_env = _mlflow_conda_env(additional_conda_deps=None,
                                   additional_pip_deps=["cloudpickle=={}".format(cloudpickle.__version__), "scikit-learn=={}".format(sklearn.__version__)],
                                   additional_conda_channels=None,                                  
                                  )
    mlflow.pyfunc.log_model('random_forest', python_model=wrappedModel, signature=signature, conda_env=conda_env)

In [0]:
feature_importances = pd.DataFrame(model.feature_importances_, index=X_train.columns.tolist(), columns=['importance'])
feature_importances.sort_values(by='importance', ascending=False)   

## Part 3: register the model

In [0]:
runs = mlflow.search_runs(filter_string=f"tags.mlflow.runName='untuned_random_forest'", order_by=['metrics.auc DESC'], max_results=1)
run_id = runs.iloc[0].run_id
print(run_id)


In [0]:

model_name = "wine_quality"
model_version = mlflow.register_model(f"runs:/{run_id}/random_forest", model_name)

time.sleep(15)

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
client.transition_model_version_stage(
    name=model_name,
    version=model_version.version,
    stage = "Production",
)

In [0]:
model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version.version}")

#this should match the auc logged by mlflow
print(f"auc: { roc_auc_score(y_test, model.predict(X_test))}")

## Part 4: experiment with a new model

In [0]:
from hyperopt  import fmin, tpe, hp, SparkTrials, STATUS_OK, Trials
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb

In [0]:
search_space = {
    'learning_rate': hp.loguniform('learning_rate', -3, 0),
    'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
    'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
    'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
    'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
    'objective': 'binary:logistic',
    'seed': 123,

}

In [0]:
def train_model(params):
    mlflow.xgboost.autolog()
    with mlflow.start_run(nested=True):
        train = xgb.DMatrix(X_train, label=y_train)
        validation = xgb.DMatrix(X_val, label=y_val)
        #pass the validation set so that xgb can track evaluation metric. XGBoost terminates training when the evaluation metric reach an optimal value
        booster = xgb.train(params, dtrain=train, num_boost_round=100, evals=[(validation, 'validation')], 
                            early_stopping_rounds=50
                            )
        validation_predictions = booster.predict(validation)
        auc_score = roc_auc_score(y_val, validation_predictions)
        mlflow.log_metric('auc', auc_score)

        signature = infer_signature(X_train, booster.predict(train))
        mlflow.xgboost.log_model(booster, 'model', signature=signature)

        #set the loss to -1*auc_score so fmin maximizes the auc_score
        return {'loss': -1*auc_score, 'status': STATUS_OK, 'booster':booster.attributes()}

In [0]:
tpe

In [0]:
algo = tpe.suggest

In [0]:
from hyperopt import SparkTrials
#greater parallelism will lead to speedups, but a less optimaöl hyperparameter sweep

spark_trials = SparkTrials(parallelism=10)

In [0]:
with mlflow.start_run(run_name="tuned_xgboost"):
    best_params = fmin(
        fn=train_model,
        space=search_space,
        algo=algo,
        max_evals=12,
        trials=spark_trials,
    )