# Ray with TensorFlow Test Notebook

The purpose of this notebook is to confirm that Ray Train with TensorFlow works in ODH.

This notebook primarily consists of an implementation of the TensorFlowlow example from the Ray docs on [Ray Train](https://docs.ray.io/en/latest/train/train.html). 

However, it has been modified to test that the Ray Train features for TensorFlow work in an Open Data Hub environment. We have also increased the number of samples and epochs run so that the speed up from Ray's distribution can be seen clearly.   

In [1]:
import numpy as np
import tensorflow as tf 
import json
import os

import ray
from ray.train import Trainer

2022-07-26 18:02:17.433073: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-07-26 18:02:17.433109: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


## Setup
We're going to connect to our Ray Cluster that was spun up for us as part of the [ray notebook image](https://github.com/thoth-station/ray-ml-notebook) we selected through the ODH spawner page. 

This cell should also run locally without a Ray cluster as it checks for the relevant environment variable "RAY_CLUSTER"  

In [2]:
ray.init('ray://{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']))

ClientContext(dashboard_url='10.128.3.96:8265', python_version='3.8.12', ray_version='1.12.1', ray_commit='4863e33856b54ccf8add5cbe75e41558850a1b75', protocol_version='2022-03-16', _num_clients=1, _context_to_restore=<ray.util.client._ClientContext object at 0x7f01f1a36370>)

# Define our data and model architecture

For this example we will be using the well known MNIST character recognition dataset to train a classification model using a convolutional deep neural network (2 layers). However, for this particular notebook, we don't really care about the particular dataset or machine learning task. The goal here is to prove that our open data hub deployment can run Ray Train jobs with TensorFlow.

First thing we will do is create a function that returns a TensorFlow Dataset object from the mnist dataset provided by tensorflow. We can use this Dataset object to iterate over batches of data during training. 

Next we'll define our TensorFlow model. This will be a convolutional neural network with 2 hidden layers, a 2D convolutional later, and a Dense layer. The model will taken in a 28 x 28 array and out put a 1 x 10 array representing the 10 possible categorize of digits our model will choose between.  

In [3]:
def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(6000).repeat().batch(batch_size)
    return train_dataset


def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

# Non-Distributed Training 

Before we look at Ray and it's distributed training, let's start by training a single worker model here first to give us a baseline and something compare our distributed model to in a minute. 

In [4]:
def train_func():
    batch_size = 64
    single_worker_dataset = mnist_dataset(batch_size)
    single_worker_model = build_and_compile_cnn_model()
    single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

In [5]:
%%time
train_func()

2022-07-26 18:02:22.511309: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-07-26 18:02:22.513926: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-07-26 18:02:22.513990: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcublas.so.11'; dlerror: libcublas.so.11: cannot open shared object file: No such file or directory
2022-07-26 18:02:22.514039: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcublasLt.so.11'; dlerror: libcublasLt.so.11: cannot open shared object file: No such file or directory
2022-07-26 18:02:22.514088: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Co

Epoch 1/3
 6/70 [=>............................] - ETA: 0s - loss: 2.3094 - accuracy: 0.0807 

2022-07-26 18:02:22.972882: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 2/3
Epoch 3/3
CPU times: user 12.8 s, sys: 1.4 s, total: 14.2 s
Wall time: 3.52 s


# Distributed Training

Great, now let's re-write our training function a bit so that it's compatible with Ray Train. To update this function we need to:
* Add `TF_CONFIG` environment variable that ray we handle for use
* Add `num_workers` defined as the length of workers in tf_config
* Set the distributed learning strategy as MultiWorkerMirroredStrategy
* Add a `global_batch_size`
* Add a `with strategy.scope()` statement to place our model building step 
* Finally, add a `ray.train.save_checkpoint` so that we can use our trained model for inference later on. 

In [6]:
def train_func_distributed():
    per_worker_batch_size = 64
    # This environment variable will be set by Ray Train.
    tf_config = json.loads(os.environ['TF_CONFIG'])
    num_workers = len(tf_config['cluster']['worker'])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70, verbose=0)
    ray.train.save_checkpoint(epoch=2, model_weights=multi_worker_model.get_weights()) 

Here we instantiate our Ray `Trainer` that we use to manage which backend we want (pytorch, tensorflow or horovod) and the number of workers we will want to use. Below we will use 2 workers. Here we can also define whether or not we want to use a gpu for training. 

In [7]:
trainer = Trainer(backend='tensorflow', num_workers=2, use_gpu=True, max_retries=10)

2022-07-26 18:02:25,451	INFO trainer.py:223 -- Trainer logs will be logged in: /opt/app-root/src/ray_results/train_2022-07-26_18-02-25


Alright! Let's run our training function and see how long it takes with Ray Train's distribution functionality. Please note, there is an overhead cost associated with starting the Trainer. So let's time that separately from our actual training function. 

In [8]:
%%time
trainer.start()

[2m[36m(BackendExecutor pid=1668)[0m 2022-07-26 18:02:28.723785: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
[2m[36m(BackendExecutor pid=1668)[0m 2022-07-26 18:02:28.723820: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
[2m[36m(BaseWorkerMixin pid=390, ip=10.131.0.36)[0m 2022-07-26 18:02:31.896774: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
[2m[36m(BaseWorkerMixin pid=390, ip=10.131.0.36)[0m 2022-07-26 18:02:31.896818: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
[2m[36m(BaseWorkerMixin pid=1706

CPU times: user 16.9 ms, sys: 3.17 ms, total: 20.1 ms
Wall time: 6.89 s


In [None]:
%%time
results = trainer.run(train_func_distributed)

Times will vary depending on where you are running this notebook, the sample size you selected above, the number of epochs and the number of workers, but if everything worked correctly and you are using a distributed ray cluster, the Wall time for the `train.run()` function above should be significantly less than that for the non-distributed training run. 

In [10]:
%%time
trainer.shutdown()

CPU times: user 2.67 ms, sys: 129 µs, total: 2.8 ms
Wall time: 35.9 ms


## Evaluate

We've trained a model! Now we need to make sure we can use it for inference. Below we'll perform to quick examples of using the trained model. First, we'll generate a brand new data set the same way we did above, and use accuracy and sparse categorical cross entropy loss (same evaluation makes from training) to evaluate the model's performance on a new batch of 64 inputs.

We are not particularly concerned about the values here, but are simply illustrating that we can perform inference with our newly trained model. 

In [11]:
results = trainer.latest_checkpoint

In [12]:
inference = build_and_compile_cnn_model()

In [13]:
inference.set_weights(results["model_weights"])

In [14]:
ds = mnist_dataset(64)
ds = ds.take(1)
for i in ds:
    ds = i
X = ds[0]
y = ds[1]

In [15]:
inference.evaluate(X,y)



[2.0986456871032715, 0.546875]

If you are reading this cell and there are no errors above, then you have successfully run a Ray Train TensorFlow notebook in a distributed environment with Open Data Hub!  Yeahh!! :) 

In [17]:
ray.util.disconnect()