### Train 2 model (Hyperopt and SparkTrials), Parallel training with eachother, obtain the best
###[Data](https://archive.ics.uci.edu/ml/datasets/Wine)

###[Banafsheh Hassani](https://www.linkedin.com/in/banafsheh-hassani-7b063a129/)

###[More Projects](https://github.com/BanafshehHassani)

[Reference](https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/mlflow/ml-quickstart-training.html)

# ML Hyperopt & SparkTrials: : Model Training

This notebook is an example of machine learning, for train models on Databricks. Which used scikit-learn libraries to preinstalled on the Databricks Runtime for Machine Learning. Plus, using MLflow to track our trained models, moreover use Hyperopt with SparkTrials for scale hyperparameter tuning.

#Steps:
.
* 1. Train simple classification model by MLflow tracking
* 2. Hyperparameter tuning to obtain the best performing model by Hyperopt

### Import libraries

In [0]:
pip install mlflow 

In [0]:
pip install hyperopt 

In [0]:
import mlflow
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope

### Load and preprocess [data](https://archive.ics.uci.edu/ml/datasets/Wine)

In [0]:
#Load data
data = spark.table("winequality_white_csv")
white_wine = data.toPandas()
red_wine = spark.table("winequality_red_1_csv")
red_wine = red_wine.toPandas()

#Fix column names
new_headerW = white_wine.loc[0] 
white_wine = white_wine[1:] 
white_wine.columns = new_headerW 
white_wine.rename(columns=white_wine.iloc[0])
new_headerR = red_wine.loc[0] 
white_wine = red_wine[1:] 
red_wine.columns = new_headerR 
red_wine.rename(columns=red_wine.iloc[0])
red_wine= red_wine[1:]

In [0]:
import pandas as pd

#Distinct red/white wine
red_wine['is_red'] = 1
white_wine['is_red'] = 0

#Combain datatsets together
data = pd.concat([red_wine, white_wine])
data['is_red'].astype(int)

# Remove spaces from name of columns
data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)
data.drop(['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11'], axis=1, inplace=True)

Drop null values

In [0]:
data = data.dropna()
data.head()

Unnamed: 0,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,quality,is_red
1,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5,1
2,7.8,0.88,0.0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5,1
3,7.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5,1
4,11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6,1
5,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5,1


Define classification labels based on the wine quality

In [0]:
data_df = data
data_labels =(data_df['quality'] >= '7')
data_df = data_df.drop(['quality'], axis=1)

Split 80/20 train-test

In [0]:
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
  data_df,
  data_labels,
  test_size=0.2,
  random_state=1
)

##  1. Train simple classification model by MLflow tracking

### MLflow Tracking
For organize the machine learning training code, parameters and the models.

Enable MLflow autologging

In [0]:
mlflow.autolog()

Train a classifier by the context of a MLflow run, that automatically logs the trained model as well as many associated metrics & parameters.

In [0]:
with mlflow.start_run(run_name='gradient_boost') as run:
  model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)
  
  # Models, parameters, and training metrics are tracked automatically
  model.fit(X_train, y_train)

  predicted_probs = model.predict_proba(X_test)
  roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
  
  # The AUC score on test data is not automatically logged, so log it manually
  mlflow.log_metric("test_auc", roc_auc)
  print("Test AUC of: {}".format(roc_auc))

Train different model by another hyperparameters.

In [0]:
# A new run & assign a run_name for future references.
with mlflow.start_run(run_name='gradient_boost') as run:
  model_2 = sklearn.ensemble.GradientBoostingClassifier(
    random_state=0, 
    
    # Try a new parameter setting for n_estimators
    n_estimators=200,
  )
  model_2.fit(X_train, y_train)

  predicted_probs = model_2.predict_proba(X_test)
  roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
  mlflow.log_metric("test_auc", roc_auc)
  print("Test AUC of: {}".format(roc_auc))

### Load models
The cell below is loading the model trained in a given MLflow run and use it for predictions.

In [0]:
model_loaded = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=run.info.run_id
  )
)
predictions_loaded = model_loaded.predict(X_test)
predictions_original = model_2.predict(X_test)
assert(np.array_equal(predictions_loaded, predictions_original))

##  2. Hyperparameter tuning to obtain the best performing model by Hyperopt
The simple model has trained plus, use the MLflow tracking service to organize the work.
At this point more sophisticated tuning using Hyperopt has to performe.

### Parallel training by Hyperopt - SparkTrials

The Hyperopt with SparkTrials can use for run hyperparameter sweeps & train multiple models in parallel.

In [0]:
# Search space to explore
search_space = {
  'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
  'learning_rate': hp.loguniform('learning_rate', -3, 0),
  'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
}

def train_model(params):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    model_hp = sklearn.ensemble.GradientBoostingClassifier(
      random_state=0,
      **params
    )
    model_hp.fit(X_train, y_train)
    predicted_probs = model_hp.predict_proba(X_test)
    # Tune based on AUC test
    roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
    mlflow.log_metric('test_auc', roc_auc)
    return {'status': STATUS_OK, 'loss': -1*roc_auc}
spark_trials = SparkTrials(
  parallelism=8
)
with mlflow.start_run(run_name='gb_hyperopt') as run:
  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest, 
    max_evals=32,
    trials=spark_trials)

### Search runs to obtain the best model
This tuned model should perform more better than the simpler models trained at step 1.

In [0]:
# Sort runs by their test auc; in case of ties, use the most recent run
best_run = mlflow.search_runs(
  order_by=['metrics.test_auc DESC', 'start_time DESC'],
  max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))

best_model_pyfunc = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )
)
best_model_predictions = best_model_pyfunc.predict(X_test[:5])
print("Test Predictions: {}".format(best_model_predictions))