# Training on MNIST Dataset using Spark Operator and Horovod
    

To package the trainer in a container image, we shall need a file (on our cluster) that contains the code as well as a file with the resource definitition of the job for the Kubernetes cluster:

In [1]:
TRAINER_FILE = "spark_mnist.py"
KUBERNETES_FILE = "sparkapp-mnist.yaml"

We also want to capture output from a cell with [`%%capture`](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-capture) that usually looks like `some-resource created`.
To that end, let's define a helper function:

In [2]:
import re

from IPython.utils.capture import CapturedIO


def get_resource(captured_io: CapturedIO) -> str:
    """
    Gets a resource name from `kubectl apply -f <configuration.yaml>`.

    :param str captured_io: Output captured by using `%%capture` cell magic
    :return: Name of the Kubernetes resource
    :rtype: str
    :raises Exception: if the resource could not be created (e.g. already exists)
    """
    out = captured_io.stdout
    matches = re.search(r"^(.+)\s+created", out)
    if matches is not None:
        return matches.group(1)
    else:
        raise Exception(f"Cannot get resource as its creation failed: {out}")

## How to Train the Model in the Notebook
Since we ultimately want to train the model in a distributed fashion (potentially on GPUs), we put all the code in a single cell.
That way we can save the file and include it in a container image:

In [3]:
%%writefile $TRAINER_FILE
import argparse
import os
import tempfile

import numpy as np
import horovod.spark #https://github.com/horovod/horovod
import horovod.tensorflow.keras as hvd
import tensorflow as tf
from pyspark.sql import SparkSession


def get_dataset(rank=0, size=1):
    with np.load('datasets/mnist.npz', allow_pickle=True) as f:
        x_train = f['x_train'][rank::size]
        y_train = f['y_train'][rank::size]
        x_test = f['x_test'][rank::size]
        y_test = f['y_test'][rank::size]
        x_train, x_test = x_train / 255.0, x_test / 255.0 # Normalize RGB values to [0, 1]
        return (x_train, y_train), (x_test, y_test)


def get_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model


def deserialize(model_bytes):
    import horovod.tensorflow.keras as hvd
    import h5py
    import io
    bio = io.BytesIO(model_bytes)
    with h5py.File(bio, 'a') as f:
        return hvd.load_model(f)


def predict_number(model, x_test, image_index):
    pred = model.predict(x_test[image_index:image_index + 1])
    print(f"Model prediction for index {image_index}: {pred.argmax()}")


def train_hvd(learning_rate, batch_size, epochs):
    # 1 - Initialize Horovod
    hvd.init()

    # 2 - Pin GPUs
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
    
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())
    model = get_model()

    # 3 - Wrap optimizer
    optimizer = hvd.DistributedOptimizer(
        # 4- Scale learning rate
        tf.optimizers.Adam(lr=learning_rate * hvd.size())
    )

    model.compile(optimizer=optimizer,loss='sparse_categorical_crossentropy',experimental_run_tf_function=False, metrics=['accuracy'])

    callbacks = [
        # 5 - Broadcast initial variables
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
        hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=3, verbose=1),
    ]

    # 6 - Save checkpoints
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    if hvd.rank() == 0:
        callbacks.append(
            tf.keras.callbacks.ModelCheckpoint(ckpt_file, monitor='accuracy', mode='max',
                                               save_best_only=True))

    history = model.fit(x_train, y_train,
                        batch_size=batch_size,
                        callbacks=callbacks,
                        epochs=epochs,
                        verbose=2,
                        validation_data=(x_test, y_test))

    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return history.history, f.read()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Horovod-on-Spark MNIST Training Job")

    parser.add_argument(
        "--learning_rate",
        type=int,
        default=0.001,
        metavar="N",
        help="Learning rate (default: 0.001)",
    )
    parser.add_argument(
        "--batch_size",
        type=int,
        default=64,
        metavar="N",
        help="Batch size for training (default: 64)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=5,
        metavar="N",
        help="Number of epochs to train (default: 5)",
    )

    args, _ = parser.parse_known_args()
    spark = SparkSession.builder.appName("HorovodOnSpark").getOrCreate()

    image_index = 100
    (x_train, y_train), (x_test, y_test) = get_dataset()
    
    print(f"Expected prediction for index {image_index}: {y_test[image_index]}")
    
    # Train model with Horovod on Spark
    model_bytes = horovod.spark.run(train_hvd, args=(args.learning_rate,
                                                     args.batch_size,
                                                     args.epochs))[0][1]

    model = deserialize(model_bytes)
    model.evaluate(x_test, y_test, verbose=2)

    predict_number(model, x_test, image_index)
    spark.stop()

Overwriting spark_mnist.py


In [4]:
%env HOROVOD_JOB=$TRAINER_FILE

env: HOROVOD_JOB=spark_mnist.py


To verify the training job, let's first run it on Spark in a local mode:

In [5]:
%env PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python

env: PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python


In [6]:
! ${SPARK_HOME}/bin/spark-submit --master local[1] $HOROVOD_JOB --epochs=1

/bin/sh: 1: /bin/spark-submit: not found




For those interested, the Dockerfile for this training can be built locally

```
FROM mavencodev/sparkjob:1.0
ADD mnist.py /
ADD datasets /datasets

WORKDIR /
```

If GPU support is not needed, you can leave off the `-gpu` suffix from the image.
`mnist.py` is the trainer code you have to download to your local machine.

Then it's easy to push images to your container registry:

```bash
docker build -t <docker_image_name_with_tag> .
docker push <docker_image_name_with_tag>
```

The image is available as `mavencodev/sparkjob:1.0` in case you want to skip it for now.


In [7]:
%%writefile $KUBERNETES_FILE
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: horovod-mnist-0
spec:
  type: Python
  mode: cluster
  pythonVersion: "3"
  image: mavencodev/sparkjob:1.0
  imagePullPolicy: Always  
  mainApplicationFile: "local:///mnist.py"
  sparkVersion: "3.0.0"
  restartPolicy:
    type: Never
  arguments:
    - --epochs
    - "10"
  driver:
    env:
    - name: PYTHONUNBUFFERED
      value: "1"
    cores: 1
    memory: "1G"
    labels:
      version: 3.0.0
      metrics-exposed: "true"  
    annotations:
      sidecar.istio.io/inject: "false"
    serviceAccount: default-editor
  executor:
    cores: 1
    instances: 5
    memory: "512m"
    labels:
      version: 3.0.0
      metrics-exposed: "true"  
    annotations:
      sidecar.istio.io/inject: "false"
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      jmxExporterJar: "/prometheus/jmx_prometheus_javaagent-0.11.0.jar"
      port: 8090

Overwriting sparkapp-mnist.yaml




Let's deploy the distributed training job:

In [8]:
%%capture hvd_output --no-stderr
! kubectl create -f $KUBERNETES_FILE

In [9]:
HVD_JOB = get_resource(hvd_output)

Let's verify the pods are being created according to our specification:

In [10]:
! kubectl get pods -l sparkoperator.k8s.io/app-name=horovod-mnist

NAME                   READY   STATUS      RESTARTS   AGE
horovod-mnist-driver   0/1     Completed   0          15h


We can check the model prediction (as before) by looking at the logs of the driver:

In [11]:
! kubectl logs horovod-mnist-driver | grep 'Model prediction'

Model prediction for index 100: 6


Likewise we can see the status of the `horovod-mnist` `SparkApplication`:

In [12]:
! kubectl describe $HVD_JOB

Name:         horovod-mnist-0
Namespace:    demo01
Labels:       <none>
Annotations:  <none>
API Version:  sparkoperator.k8s.io/v1beta2
Kind:         SparkApplication
Metadata:
  Creation Timestamp:  2021-03-17T12:32:24Z
  Generation:          1
  Managed Fields:
    API Version:  sparkoperator.k8s.io/v1beta2
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        .:
        f:arguments:
        f:driver:
          .:
          f:annotations:
            .:
            f:sidecar.istio.io/inject:
          f:cores:
          f:env:
          f:labels:
            .:
            f:metrics-exposed:
            f:version:
          f:memory:
          f:serviceAccount:
        f:executor:
          .:
          f:annotations:
            .:
            f:sidecar.istio.io/inject:
          f:cores:
          f:instances:
          f:labels:
            .:
            f:metrics-exposed:
            f:version:
          f:memory:
        f:image:
        f:imagePullPolicy:
        f:ma

In [13]:
! kubectl delete $HVD_JOB

sparkapplication.sparkoperator.k8s.io "horovod-mnist-0" deleted


Check to see if the check to see if the pod is still up and running 

In [14]:
! kubectl -n demo01 logs -f horovod-mnist

Error from server (NotFound): pods "horovod-mnist" not found
