# DataBricks MLFlow demo

The general premise for the end-to-end for machine learning is to have some order, repeatable, and easy to use/manipulate frameworks to generate results that you can track over time. End-to-End ML flows usually solve all or some of the following challenges

* Manage data
* Train models
* Evaluate models
* Deploy models
* Make predictions
* Monitor predictions
(From Michelangelo)

Frameworks already exist, that large companies are currently using/experimenting with to solve the above challenges, to name a few of the ones that Alex asked me to look at

* [FBLearner Flow](https://code.fb.com/core-data/introducing-fblearner-flow-facebook-s-ai-backbone/)
* [Uber's Michelangelo](https://eng.uber.com/michelangelo/)
* [Google TFX](http://www.kdd.org/kdd2017/papers/view/tfx-a-tensorflow-based-production-scale-machine-learning-platform)
* [DataBricks MLFlow](https://www.mlflow.org/)

Other frameworks that solve some specific challenges
* [MITs ModelDB](https://mitdbg.github.io/modeldb/) - Monitor Training/Evaluation/Predictions
* [Data Version Control(DVC)](https://dvc.org/) - Manage data

All of the above could show us some of the practices

This notebook will look at running some examples from [MLFLow](https://www.mlflow.org/docs/latest/index.html). The three demos will cover the three streams that `MLFlow` aims to resolve for the data scientist.

* Tracking of projects
* Execution of projects
* Deployment of projects


### How to run the notebook
This notebook must be ran by doing the following:
* `cd <root>/notebooks`
* `pipenv run jupyter notebook`

Then you can execute all of the commands in this notebook.

remember to run `mlflow ui` in the same directory where you are running the notebooks from.


In [19]:
import os
import mlflow
import pandas as pd

In [20]:
# this will be used to test if an experiment exists/ I am sure there is a get command.
list_experiments = mlflow.tracking.list_experiments()
experiments = {l.name: l.experiment_id for l in list_experiments}
experiments

{'Default': 0,
 'Test Basic': 1,
 'Test Tensorflow Basic': 3,
 'Test Sklearn Basic': 2}

### Example of tracking via the MLFlow UI
Lets set the experiment and start the run. These steps allows one to partition out the results in a logical fashion on the ui.

In [4]:
experiment_name = "Test Basic"
experiment_id = experiments.get(experiment_name)

experiment_id = experiment_id if experiment_id else mlflow.create_experiment(experiment_name=experiment_name)

mlflow.start_run(source_name="a really basic python script", experiment_id=experiment_id)

<mlflow.tracking.ActiveRun at 0x10b22b320>

In [5]:
# Log a parameter (key-value pair)
mlflow.log_param("param1", 5)

# Log a metric; metrics can be updated throughout the run
mlflow.log_metric("foo", 1)
mlflow.log_metric("foo", 2)
mlflow.log_metric("foo", 3)

# Log an artifact (output file)
with open("output.txt", "w") as f:
    f.write("Hello world!")
mlflow.log_artifact("output.txt")

End run with the following command

In [6]:
# end the run
mlflow.end_run()

### Example of running sklearn + deploy prediction API
The simple logistic regression model showcases the model training and the built model. This allows fast iteration without losing track of the model.

`mlflow.log_metric` wraps around the score object from the standard `LogisticRegression` class. The `mlflow.sklearn.log_model` saves the model.

In [21]:
experiment_name = "Test Sklearn Basic"
experiment_id = experiments.get(experiment_name)
experiment_id = experiment_id if experiment_id else mlflow.create_experiment(experiment_name=experiment_name)

mlflow.start_run(source_name="a really basic sklearn python script", experiment_id=experiment_id)

<mlflow.tracking.ActiveRun at 0x11a073e80>

In [22]:
import numpy as np
from sklearn.linear_model import LogisticRegression

import mlflow.sklearn
X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
lr = LogisticRegression()
lr.fit(X, y)
score = lr.score(X, y)
print("Score: %s" % score)
mlflow.log_metric("score", 0.76)
mlflow.log_metric("callum", score)
mlflow.sklearn.log_model(lr, "model")

run_id = mlflow.active_run().info.run_uuid
print("Model saved in run %s" % run_id)
mlflow.end_run()

Score: 0.6666666666666666
Model saved in run a1505c10fec94379b4d0cbcfea8ec71c


The `run-id` is given by the last line after "Model saved in run `<run-id>`", this id will be used to spin up the flask api so that you can get a prediction with a post request.

Note: That `info` only appears on `active_run` after we have executed at least one operation with `mlflow` before that. 

Please run the following command in a terminal with `mlflow` present.

In [9]:
"mlflow sklearn serve -r %s model" % run_id

'mlflow sklearn serve -r c965cd29a231439aad16a618523cdc76 model'

**Note:** Before running the command above please ensure that `mflow ui` is not in use.

Once you have ran the above command in the terminal (any terminal), this will open a flask app locally which we can now hit with the following command
`curl -d '[{"x": 1}, {"x": -1}]' -H 'Content-Type: application/json' -X POST localhost:5000/invocations`

The result of which should look something like this
`{"predictions": [1, 0]}` 

## Example of running a tensorflow project
The beauty of `MLFlow` is the cross integration with other frameworks, with TFlow being one of them. 

We will use the example found [here](https://github.com/databricks/mlflow/blob/master/example/tutorial/tensorflow_example.py)

Further details for Tensorflow on `MLFlow` can be found on this [blog post](https://databricks.com/blog/2018/07/03/mlflow-0-2-released.html)

In [10]:
from mlflow import tensorflow, tracking
import shutil
import tempfile
import tensorflow as tf

In [12]:
# this will cause a download of the dataset if you do not have it already locally
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.boston_housing.load_data() 

In [13]:
feat_cols = [tf.feature_column.numeric_column(key="features", shape=(x_train.shape[1],))]
feat_spec = {"features": tf.placeholder("float", name="features", shape=[None, x_train.shape[1]])}
hidden_units = [50, 20]
steps = 1000
regressor = tf.estimator.DNNRegressor(hidden_units=hidden_units, feature_columns=feat_cols)
train_input_fn = tf.estimator.inputs.numpy_input_fn({"features": x_train}, y_train, num_epochs=None, shuffle=True)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': '/var/folders/54/64fmthqj1bj8wyzhctsp93v40000gp/T/tmpsn9zoseu', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x108328fd0>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [14]:
experiment_name = "Test Tensorflow Basic"
experiment_id = experiments.get(experiment_name)
experiment_id = experiment_id if experiment_id else mlflow.create_experiment(experiment_name=experiment_name)

mlflow.start_run(source_name="simple TFlow example with boston housing", experiment_id=experiment_id)

<mlflow.tracking.ActiveRun at 0x11a073cc0>

In [15]:
with tracking.start_run() as tracked_run:
    mlflow.log_param("Hidden Units", hidden_units)
    mlflow.log_param("Steps", steps)
    regressor.train(train_input_fn, steps=steps)
    test_input_fn = tf.estimator.inputs.numpy_input_fn({"features": x_test}, y_test, num_epochs=None, shuffle=True)
    # Compute mean squared error
    mse = regressor.evaluate(test_input_fn, steps=steps)
    mlflow.log_metric("Mean Square Error", mse['average_loss'])
    # Building a receiver function for exporting
    receiver_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(feat_spec)
    temp = tempfile.mkdtemp()
    try:
        saved_estimator_path = regressor.export_savedmodel(temp, receiver_fn).decode("utf-8")
        # Logging the saved model
        tensorflow.log_saved_model(saved_model_dir=saved_estimator_path, signature_def_key="predict", artifact_path="model")
        # Reloading the model
        pyfunc = tensorflow.load_pyfunc(saved_estimator_path)
        df = pd.DataFrame(data=x_test, columns=["features"] * x_train.shape[1])
        # Predicting on the loaded Python Function
        predict_df = pyfunc.predict(df)
        predict_df['original_labels'] = y_test
        print(predict_df)
    finally:
        shutil.rmtree(temp)


INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into /var/folders/54/64fmthqj1bj8wyzhctsp93v40000gp/T/tmpsn9zoseu/model.ckpt.
INFO:tensorflow:loss = 562730.94, step = 1
INFO:tensorflow:global_step/sec: 633.353
INFO:tensorflow:loss = 8117.486, step = 101 (0.159 sec)
INFO:tensorflow:global_step/sec: 922.575
INFO:tensorflow:loss = 3434.9106, step = 201 (0.108 sec)
INFO:tensorflow:global_step/sec: 807.801
INFO:tensorflow:loss = 6780.1943, step = 301 (0.124 sec)
INFO:tensorflow:global_step/sec: 849.113
INFO:tensorflow:loss = 4860.667, step = 401 (0.118 sec)
INFO:tensorflow:global_step/sec: 837.909
INFO:tensorflow:loss = 5708.8027, step = 501 (0.119 sec)
INFO:tensorflow:global_step/sec: 808.308
INFO:tensorflow:loss = 4513.7793, step = 601 (0.124 sec)
INFO:tensorflow

We can run the tensorflow serving app using the built model here, but this needs to be looked into further for a more generic way of deploying like the `sklearn serve` above.

In [None]:
# apulu run time.
# created custom classifier
# docker to docker

## An example of a custom classifier
A custom classifier opens up the ability to use more complex models and pipelines.

In [10]:
experiment_name = "Test Custom Classifier"
experiment_id = experiments.get(experiment_name)

experiment_id = experiment_id if experiment_id else mlflow.create_experiment(experiment_name=experiment_name)

mlflow.start_run(source_name="customise classifier", experiment_id=experiment_id)

<mlflow.tracking.ActiveRun at 0x112c34978>

In [23]:
from sklearn.base import BaseEstimator, ClassifierMixin
import pickle

class CustomClassifier(ClassifierMixin):
    
    def __init__(self, base_estimator=LogisticRegression(), threshold=0.5):
        self.base_classifier = base_estimator
        self._threshold = threshold

    def fit(self, X, y=None):
        self.base_classifier.fit(X, y)
        return self

    @staticmethod
    def _print(x, probabilities, pred):
        for i, prob in enumerate(probabilities):
            print("Input: {} probability: {} class: {}".format(x[i], prob, pred[i]))
        
    def predict(self, x):
        proba = self.base_classifier.predict_proba(x)
        predictions = [np.argmax(p) for p in proba]
        self._print(x, proba, predictions)
        return predictions

X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
lr = CustomClassifier()
lr.fit(X, y)
score = lr.score(X, y)
print("Score: %s" % score)

# see code in next block

mlflow.end_run()

Input: [-2] probability: [0.82530694 0.17469306] class: 0
Input: [-1] probability: [0.69342522 0.30657478] class: 0
Input: [0] probability: [0.51989912 0.48010088] class: 0
Input: [1] probability: [0.34143529 0.65856471] class: 1
Input: [2] probability: [0.19885791 0.80114209] class: 1
Input: [1] probability: [0.34143529 0.65856471] class: 1
Score: 0.6666666666666666


I did try to run a generic python function/custom classifier above (see below cell), to log to the `mlflow ui` but this was not successful. You can probably look [here](https://www.mlflow.org/docs/latest/models.html#custom-flavors) 

In [None]:
# with open("model.pkl", "wb") as mf:
#     pickle.dump(lr, mf)
# mlflow.pyfunc.log_model(lr)
# #mlflow.pyfunc.log(lr, "model")

# run_id = mlflow.active_run().info.run_uuid
# print("Model saved in run %s" % run_id)
# mlflow.end_run()

### Next steps
* Run the code within a docker env to see if we can potentially run a remote server for the `mlflow ui`
* See if we can have multiple people working on the same `server` logging models and runs - good for audit purpose
* Serve tensorflow models
* Can it work on `GCP`