# Bonus: RaySGD + MLflow

## Distributed Deep Learning Made Simple: RaySGD

Distributed deep learning -- or optimization in general using TensorFlow/PyTorch -- has slowly been getting easier, even if it's not 100% transparent (or turn-key) quite yet.

Native solutions (official TF/PyTorch distributed code) are getting easier; Horovod has matured and is straightforward. But one of the simplest approaches of all comes via the Ray project.

In particular, RaySGD provides a zero-ops and minimal API approach: https://docs.ray.io/en/master/raysgd/raysgd.html

Let's take a look! 

First, we'll set up a toy example (adapted from https://docs.ray.io/en/master/raysgd/raysgd_tensorflow.html)

Later, we'll try a real dataset and integrate with additional tools.

Here, we create trivial dummy data and a Tensorflow Dataset loader.

In [None]:
import numpy as np

def linear_dataset(size=100):
    x = np.random.rand(size)
    y = 2 * x

    x = x.reshape((-1, 1))
    y = y.reshape((-1, 1))

    return x, y

def simple_dataset(config):
    batch_size = config["batch_size"]
    x_train, y_train = linear_dataset(size=NUM_TRAIN_SAMPLES)
    x_test, y_test = linear_dataset(size=NUM_TEST_SAMPLES)

    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    train_dataset = train_dataset.shuffle(NUM_TRAIN_SAMPLES).repeat().batch(
        batch_size)
    test_dataset = test_dataset.repeat().batch(batch_size)

    return train_dataset, test_dataset

Next, we define our model using regular `tf.keras` components. We define a model-creation function, as RaySGD uses a factory pattern (basically a pattern that takes creator functions rather than object instances) for dependencies.

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

def simple_model(config):
    model = Sequential([Dense(10, input_shape=(1, )), Dense(1)])

    model.compile(
        optimizer="sgd",
        loss="mean_squared_error",
        metrics=["mean_squared_error"])

    return model

Now we get to the RaySGD code. After definining a minimal "config" object, we define a trainer function that accomplishes these steps:
* instantiate a TFTrainer instance to wrap the actual distributed training
* explicitly calculate starting model performance
* train multiple epochs
    * here we're explicitly calling `train` twice ... in a real example we would run more epochs with a control loop
* calculate final stats, change in the loss, and a "sanity check" that the loss actually went down

In [None]:
import ray
from ray.util.sgd.tf.tf_trainer import TFTrainer, TFTrainable

NUM_TRAIN_SAMPLES = 1000
NUM_TEST_SAMPLES = 400

def create_config(batch_size):

    return {
        "batch_size": batch_size,
        "fit_config": {
            "steps_per_epoch": NUM_TRAIN_SAMPLES // batch_size
        },
        "evaluate_config": {
            "steps": NUM_TEST_SAMPLES // batch_size,
        }
    }

def train_example(num_replicas=1, batch_size=128, use_gpu=False):
    trainer = TFTrainer(
        model_creator=simple_model,
        data_creator=simple_dataset,
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        verbose=True,
        config=create_config(batch_size))

    # model baseline performance
    start_stats = trainer.validate()
    print(start_stats)

    # train for 2 epochs
    trainer.train()
    trainer.train()

    # model performance after training (should improve)
    end_stats = trainer.validate()
    print(end_stats)

    # sanity check that training worked
    dloss = end_stats["validation_loss"] - start_stats["validation_loss"]
    dmse = (end_stats["validation_mean_squared_error"] -
            start_stats["validation_mean_squared_error"])
    print(f"dLoss: {dloss}, dMSE: {dmse}")

    if dloss > 0 or dmse > 0:
        print("training sanity check failed. loss increased!")
    else:
        print("success!")

Ok, now that we're all set up, let's start Ray and run the training!

In [None]:
ray.init()

train_example()

Note that the Dashboard which defaults to `localhost:8265` is a key part of the Ray system, but may not be compatible with (and visible through) the binder container proxy.

__How does this work?__

In a nutshell,
* `TFTrainer` wraps TensorFlow's `MultiWorkerMirroredStrategy` as described here: https://docs.ray.io/en/master/raysgd/raysgd_tensorflow.html
* `MultiWorkerMirroredStrategy` is a synchronous distributed approach featuring multilateral reduce (e.g., AllReduce): https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/MultiWorkerMirroredStrategy

But the "Hello World" isn't very impressive. Let's at least try a slightly more realistic, if not industrial strength, dataset and model.

We'll train a shallow (1-layer) dense feed-forward network with ReLU activation on the R/ggplot2 diamonds data (https://ggplot2.tidyverse.org/reference/diamonds.html)

Start with a data loader, model, and config builder

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

def diamonds_dataset(config):
    batch_size = config["batch_size"]
    df = pd.read_csv('data/diamonds.csv')
    df.drop(df.columns[0], axis=1, inplace=True)
    df = pd.get_dummies(df, prefix=['cut_', 'color_', 'clarity_'])
    y = df.price.to_numpy()
    X = df.drop(columns=['price']).to_numpy()
    train_size = 40_000
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=train_size)
    
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    train_dataset = train_dataset.shuffle(len(X_train)).repeat().batch(
        batch_size)
    test_dataset = test_dataset.repeat().batch(batch_size)

    return train_dataset, test_dataset

In [None]:
def diamonds_simple_model(config):
    model = Sequential([Dense(30, input_shape=(26, ), activation='relu'), Dense(1)])

    model.compile(
        optimizer="adam",
        loss="mean_squared_error",
        metrics=["mean_squared_error"])

    return model

In [None]:
def create_diamonds_config(batch_size):
    return {
        "batch_size": batch_size,
        "fit_config": {
            "steps_per_epoch": 40000 // batch_size
        },
        "evaluate_config": {
            "steps": 13940 // batch_size,
        }
    }

For clarity, we'll define a bare-basics training function

In [None]:
def train_diamonds(num_replicas=1, batch_size=128, use_gpu=False):
    trainer = TFTrainer(
        model_creator=diamonds_simple_model,
        data_creator=diamonds_dataset,
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        verbose=False,
        config=create_diamonds_config(batch_size))

    start_stats = trainer.validate()
    print(start_stats)

    for i in range(32):
        trainer.train()

    end_stats = trainer.validate()
    print(end_stats)
        
train_diamonds()

We won't get famous for these results, but we definitely made progress.

## MLflow

One of the top open source frameworks for managing machine learning, from experiment to deployment, is MLflow (https://mlflow.org/)

Created by Databricks and open-sourced under the Linux Foundation, MLflow has rapidly evolved to support a variety of key ML engineering tasks including
* Experiment tracking
    * Parameters, results, data, code, and model artefacts/assets
* Tracking project environments for reproducibility
* Deployment from a variety of model formats to various target (prediction/scoring) environments
* Model registry
    * Versioning, lineage/provenance

More features are planned for the future; today, we'll look at just the original experiment tracking features.

MLflow supports auto-instrumentation for a number of popular platforms (https://mlflow.org/docs/latest/tracking.html#automatic-logging) but these don't include RaySGD/Distributed TensorFlow yet... plus we want to see concretely how the pieces fit together.

Before proceeding, start the MLflow UI server. On Binder/JupyterLab, open a new Terminal and type

`mlflow ui`

The UI is served on port 5000 by default, which should be accessible via the JupyterLab proxy (open a new tab and replace `/lab...` in your URL with `/proxy/5000/`)

We'll start with the minimal code to save params and metrics to MLflow

In [None]:
from mlflow import log_metric, log_param, end_run

log_param("foo_count", 42)
log_param("bar_count", 43)

for i in range(10):
    log_metric("score", i)
    
end_run()

This info should appear in the UI (though it may require a refresh)

We can use a context manager for our runs and provide step indices to enable additional view.

In [None]:
import mlflow

with mlflow.start_run():
    for i in range(10):
        log_metric("score", i*i, step=i)

Now let's look at a more realistic use of MLflow, including
* Creating a named Experiment
* Recording data for runs to this Experiment
* Integrating with our RaySGD/TF model

In [None]:
experiment_id = mlflow.create_experiment("Diamonds RaySGD")

In [None]:
from  mlflow.tracking import MlflowClient
client = MlflowClient()

In [None]:
run = client.create_run(experiment_id) # returns mlflow.entities.Run
client.log_param(run.info.run_id, "hello", "world")
client.set_terminated(run.info.run_id)

In [None]:
def train_diamonds_mlflow(num_replicas=1, batch_size=128, use_gpu=False):
    trainer = TFTrainer(
        model_creator=diamonds_simple_model,
        data_creator=diamonds_dataset,
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        verbose=False,
        config=create_diamonds_config(batch_size))

    start_stats = trainer.validate()
    print(start_stats)

    ml_run = client.create_run(experiment_id)

    for i in range(32):
        train_stats = trainer.train()
        if i % 2 == 0:
            val_stats = trainer.validate()            
            client.log_metric(ml_run.info.run_id, "validation_loss", val_stats["validation_loss"], step=i)
            client.log_metric(ml_run.info.run_id, "training_loss", train_stats["train_loss"], step=i)
        
    client.set_terminated(ml_run.info.run_id)

    end_stats = trainer.validate()
    print(end_stats)
        
train_diamonds_mlflow()

In [None]:
ray.shutdown()