In [1]:
def init():
    import mlflow
    import uuid
    mlflow.set_tracking_uri('http://localhost:5000/')
    return mlflow.create_experiment(str(uuid.uuid4()))

def load_data(experiment_id):
    """
    Load the dataset from scikit-learn datasets module.
    
    Args:
        experiment_id: MLFlow Experiment ID.
    
    Returns:
        This returns a path of a persisted Pandas DataFrame of the MNIST dataset
        which is available in the MLFlow server.
    """
    from sklearn import datasets
    import mlflow
    with mlflow.start_run(experiment_id=experiment_id, nested=True):
        X, y = datasets.load_digits(return_X_y=True, as_frame=True)
        X['label'] = y
        X.to_parquet('data/raw/mnist_raw.parquet')
        mlflow.log_artifacts('data/raw')
        return mlflow.search_runs(experiment_ids=[experiment_id])

def preprocess_data(experiment_id, run_id, data_path, omit_digits=[], train_size=0.8):
    """
    Preprocesses the provided MNIST data and excludes provided digits, if any. 

    Args:
        experiment_id: MLFlow Experiment ID
        omit_digits: a list of digits to be ommitted from the dataset.

    Returns:
        This returns a path of the training and testing DataFrames of the MNIST dataset
        that may have some digits ommitted.
    """
    import mlflow
    import pandas as pd
    from sklearn import model_selection
    
    with mlflow.start_run(experiment_id=experiment_id, run_id=run_id):
        data = pd.read_parquet(data_path + '/mnist_raw.parquet')
        # Remove duplicates in the input list of digits
        omit_digits = set(omit_digits)

        if train_size == 0:
            return (None, data[~(data['label'].isin(omit_digits))])

        train, test = model_selection.train_test_split(data[~(data['label'].isin(omit_digits))],
                                                       train_size=train_size, random_state=42)
        train.to_parquet('data/preprocessed/train.parquet')
        test.to_parquet('data/preprocessed/test.parquet')
        mlflow.log_artifacts('data/preprocessed')
        return mlflow.search_runs(experiment_ids=[experiment_id])

def create_features(experiment_id, run_id, data_path, is_train):
    """
    Generates features for the provided dataset and returns the augmented dataset.
    If the dataset provided is for training, a stateful feature generator will also be
    returned.

    Args:
        experiment_id: MLFlow Experiment ID
        data: a DataFrame containing the MNIST dataset.
        is_train: a boolean that indicates whether the input data is a training set.
                  This also indicates whether a feature_generator will be initialized or not.

    Returns:
        A path of the Pandas DataFrame containing the original and augmented datasets and an
        optionally returned scikit-learn Pipeline of feature transformations.
    """
    import numpy as np
    import pandas as pd
    import joblib
    from sklearn.pipeline import Pipeline, FeatureUnion
    from sklearn.preprocessing import StandardScaler, RobustScaler
    from sklearn.decomposition import PCA, TruncatedSVD
    
    with mlflow.start_run(experiment_id=experiment_id, run_id=run_id):

        if is_train:
            data = pd.read_parquet(data_path + '/train.parquet')
    
            scaler = FeatureUnion([
                ('standard_scaler', StandardScaler()),
                ('robust_scaler', RobustScaler()),
            ])
            decomposer = FeatureUnion([
                ('PCA', PCA(n_components=10, random_state=42)),
                ('SVD', TruncatedSVD(random_state=42))
            ])

            feature_generator =  Pipeline([
                ('scaler', scaler),
                ('decomposer', decomposer)
            ])

            features = feature_generator.fit_transform(data.drop('label', axis=1))
            joblib.dump(feature_generator, 'data/feature_generators/feature_generator')
            filename = 'train'

        else:
            data = pd.read_parquet(data_path + '/preprocessed/test.parquet')
            feature_generator = joblib.load(data_path + '/feature_generator')
            features = feature_generator.transform(data.drop('label', axis=1))
            filename = 'test'

        augmented_data = np.concatenate([
            data.drop('label', axis=1).values, 
            features,
        ], axis=1)

        augmented_data = pd.DataFrame(augmented_data)
        augmented_data.columns = augmented_data.columns.astype(str)
        augmented_data['label'] = data['label'].values
        
        augmented_data.to_parquet(f'data/processed/{filename}.parquet')
        
        mlflow.log_artifacts('data/feature_generators/')
        mlflow.log_artifacts('data/processed/')
        mlflow.log_artifacts('data')
        return mlflow.search_runs(experiment_ids=[experiment_id])


def generate_model(experiment_id, data_path, params, run_id=None):
    """
    Train a model with the provided data, where cross-validation should ideally be
    implemented.

    Args:
        experiment_id: MLFlow Experiment ID
        data: a Pandas DataFrame where the model can generate training and validation sets.
        params: model parameters
    Returns:
        A fully trained H2O model.
    """
    import pandas as pd
    import mlflow
    from mlflow import h2o as mlflow_h2o
    from mlflow import sklearn as mlflow_sklearn
    from sklearn import ensemble
    import h2o
    from h2o.automl import H2OAutoML
    
    data = pd.read_parquet(data_path + '/processed/train.parquet')
    
    with mlflow.start_run(experiment_id=experiment_id, run_id=run_id):
        h2o.init()

        features = data.columns.tolist()[:-1]
        label = data.columns.tolist()[-1]
        mlflow.log_params(params)
        sklearn_model = ensemble.RandomForestClassifier(**params)
        sklearn_model.fit(data[features], data[label])
        
        # Need to convert to H2O-compatible DataFrame
        data = h2o.H2OFrame(data, column_names=data.columns.tolist())
        data[label] = data[label].asfactor()
        model = H2OAutoML(max_models=2, balance_classes=True, seed=params['random_state'])
        model.train(features, label, data)
        
        lb = h2o.automl.get_leaderboard(model, extra_columns = 'ALL')
        mlflow.log_metric("logloss", lb.as_data_frame()['logloss'][0])
        mlflow_h2o.log_model(model.leader, f"model-{params}")
#         mlflow_sklearn.log_model(sklearn_model, f"sklearn_model-{params}")
        mlflow.log_artifacts('data')
        
        return mlflow.search_runs(experiment_ids=[experiment_id])

def generate_prediction(experiment_id, run_id, data_path, model):
    """
    Generate predictions from a provided model and dataset.

    Args:
        experiment_id: MLFlow Experiment ID
        data: a DataFrame where predictions shall be generated from.
        model: an H2O model to generate predictions with.

    Returns:
        A NumPy Series of the generated predictions
    """
    import h2o
    import pandas as pd
    
    
    
    features = data.columns.tolist()[:-1]
    label = data.columns.tolist()[-1]
    # Need to convert to H2O-compatible DataFrame
    data = h2o.H2OFrame(data, column_names=data.columns.tolist())
    data[label] = data[label].asfactor()
    preds = model.predict(data)['predict'].as_data_frame().values
    
    return preds

def produce_reports(experiment_id, real, preds):
    """
    Generate model performance reports, such as Precision, Recall, RMSE, LogLoss, etc.
    
    Args:
        experiment_id: MLFlow Experiment ID
        real: A NumPy array of the reference values.
        preds: A NumPy array of the predicted values.
    Returns:
        None
    """
    from sklearn.metrics import classification_report, matthews_corrcoef
    
    print(f'Matthews Correlation Coefficient: {matthews_corrcoef(real, preds)}')
    print(
        classification_report(real, preds, digits=4)
    )

## Model Exploration

In [2]:
import mlflow
experiment_id = init()

In [3]:
raw_path = load_data(experiment_id)

In [4]:
preprocessed_path = preprocess_data(experiment_id=experiment_id, 
                                    run_id=raw_path['run_id'][0], 
                                    data_path=raw_path['artifact_uri'][0], omit_digits=[1, 7])

In [5]:
augmented_train_path = create_features(experiment_id=experiment_id, 
                                       run_id=raw_path['run_id'][0], 
                                       data_path=raw_path['artifact_uri'][0],
                                       is_train=True)

In [6]:
augmented_test_path = create_features(
                                    experiment_id=experiment_id, 
                                    run_id=raw_path['run_id'][0], 
                                    data_path=raw_path['artifact_uri'][0],
                                    is_train=False)

In [7]:
model_path = generate_model(experiment_id=experiment_id, 
                                    run_id=raw_path['run_id'][0], 
                                    data_path=raw_path['artifact_uri'][0], params={'random_state': 1})

Checking whether there is an H2O instance running at http://localhost:54321 ..... not found.
Attempting to start a local H2O server...
  Java Version: openjdk version "1.8.0_152-release"; OpenJDK Runtime Environment (build 1.8.0_152-release-1056-b12); OpenJDK 64-Bit Server VM (build 25.152-b12, mixed mode)
  Starting server from /home/hadrian/anaconda3/envs/py36/lib/python3.6/site-packages/h2o/backend/bin/h2o.jar
  Ice root: /tmp/tmpaeqqv7xj
  JVM stdout: /tmp/tmpaeqqv7xj/h2o_hadrian_started_from_python.out
  JVM stderr: /tmp/tmpaeqqv7xj/h2o_hadrian_started_from_python.err
  Server is running at http://127.0.0.1:54321
Connecting to H2O server at http://127.0.0.1:54321 ... successful.


0,1
H2O cluster uptime:,01 secs
H2O cluster timezone:,Asia/Manila
H2O data parsing timezone:,UTC
H2O cluster version:,3.28.1.2
H2O cluster version age:,4 months and 12 days !!!
H2O cluster name:,H2O_from_python_hadrian_lk2zjv
H2O cluster total nodes:,1
H2O cluster free memory:,1.672 Gb
H2O cluster total cores:,8
H2O cluster allowed cores:,8


Parse progress: |█████████████████████████████████████████████████████████| 100%
AutoML progress: |████████████████████████████████████████████████████████| 100%


In [8]:
model_path = generate_model(experiment_id=experiment_id, 
                                    data_path=raw_path['artifact_uri'][0], params={'random_state': 42})

Checking whether there is an H2O instance running at http://localhost:54321 . connected.


0,1
H2O cluster uptime:,40 secs
H2O cluster timezone:,Asia/Manila
H2O data parsing timezone:,UTC
H2O cluster version:,3.28.1.2
H2O cluster version age:,4 months and 12 days !!!
H2O cluster name:,H2O_from_python_hadrian_lk2zjv
H2O cluster total nodes:,1
H2O cluster free memory:,1.339 Gb
H2O cluster total cores:,8
H2O cluster allowed cores:,8


Parse progress: |█████████████████████████████████████████████████████████| 100%
AutoML progress: |████████████████████████████████████████████████████████| 100%
