# Train a Neural Network loading data from and to SmartSim

In this tutorial, we will see how one can set up a workflow in which one or multiple processes produce data (e.g. in a simulation) and one or multiple processes consume the data to train a neural network. The key to achieve this behavior is to use SmartSim to load data from and to the database.

This tutorial works on Slurm, but can easily be adapted to any supported Workload Manager (PBS, Cobalt, or LSF), with an only difference: in Slurm, we are allocating the resources *from* this Notebook, whereas with other WLMs, this Notebook has to be started *within* an existing allocation.

Note: this tutorial requires the python packages `tensorflow`, `mpi4py` and `horovod`.

## 1. First scenario: an ensemble of parallel producers and a single trainer

The first use case is similar to a common workflow:
- several copies of a simulation (possibly with different initializations) are running, each one consisting of multiple MPI ranks. Each rank produces samples (e.g. data points computed by the simulation) at regular intervals (e.g. each time iteration)
- a neural network has to be trained on the data produced by the simulation, and as new data is produced, the neural network needs to add it to its training data set.


### 1.1 Workflow components
We will use SmartSim to allow the exchange of data between the data-producing processes and the training service. Thus, the first component which we will need to launch is the `Orchestrator`. This is a fairly small example, thus we will run a single-node DB.

In [None]:
from smartsim import Experiment
from smartsim.database import SlurmOrchestrator
from smartsim.settings import SrunSettings
from smartsim import slurm

def launch_cluster_orc(experiment, port, alloc):
    """Spin up a cluster"""
    db = SlurmOrchestrator(port=port,
                            db_nodes=1,
                            batch=False,
                            alloc=alloc,
                            interface="ib0")
    

    # generate directories for output files
    # pass in objects to make dirs for
    experiment.generate(db, overwrite=True)

    # start the database on interactive allocation
    experiment.start(db, block=True)

    return db


Our data will be very simple: each rank will produce random samples, drawing values from a Gaussian distribution centered at the rank id, and each sample will be labeled with the rank id. The neural network will be trained to infer which rank produced a data sample. Thus, our data set will have a total number of labels equal to the number of ranks in each replicas.

The data-producing processes will be started as part of an ensemble of two replicas (this mimics the execution of two copies of the simulation). The actual script producing the data is called `data_uploader.py` and is contained in the `tf` directory. Its content is

```python

from smartsim.ml import TrainingDataUploader
from os import environ
from time import sleep
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
mpi_rank = comm.Get_rank()
mpi_size = comm.Get_size()

batches_per_loop = 10

data_uploader = TrainingDataUploader(num_classes=mpi_size,
                                     cluster=False, 
                                     producer_prefixes="uploader",
                                     num_ranks=mpi_size,
                                     rank=mpi_rank)
                                     
if environ["SSKEYOUT"] == "uploader_0":
    data_uploader.publish_info()

# Start "simulation", produce data every two minutes, for thirty minutes
for _ in range(15):
    new_batch = np.random.normal(loc=float(mpi_rank), scale=5.0, size=(32*batches_per_loop, 224, 224, 3)).astype(float)
    new_labels = np.ones(shape=(32*batches_per_loop,)).astype(int) * mpi_rank

    data_uploader.put_batch(new_batch, new_labels)
    print(f"{mpi_rank}: New data pushed to DB")
    sleep(120)


```

as the script is running in python, we can use the `TrainingDataUploader` class, which streamlines uploading of the data samples from the simulation. Here is an explanation of the arguments:
- `num_classes` is the number of classes of our training data set
- `smartredis_cluster=False` specifies that the DB is not a cluster (it is a single shard)
- `producer_prefixes` is used to identify the SmartSim entity names of the processes producing data: this is useful in the case the training service has several incoming entities (as defined in SmartSim, through the environment variable `SSKEYIN`), but it should expect data only from a subset of them. 
- `num_ranks` is the number of concurrent data loaders withing this application, in this case it is simply the number of MPI ranks, as each process will upload its own data.

At each iteration, each rank of "simulation" calls `put_batch` to upload a batch of samples and the corresponding labels. The batch will be uploaded on the DB and stored under a key composed by a prefix (defaulting to `samples`), the sub-index, and the iteration number, which increases monotonically every time a batch is uploaded.

Now that we know the content of `data_uploader.py`, we can create an entity representing the uploader ensemble.

In [None]:
def create_uploader(experiment, alloc, nodes=1, tasks_per_node=1):
   """Start an ensemble of two processes producing sample batches at
      regular intervals.
   """
   srun = SrunSettings(exe="python",
                     exe_args="data_uploader.py",
                     env_vars={"PYTHONUNBUFFERED": "1"},
                     alloc=alloc)
   srun.set_nodes(nodes)
   srun.set_tasks_per_node(tasks_per_node)

   uploader = experiment.create_ensemble("uploader", replicas=2, run_settings=srun)

   # create directories for the output files and copy
   # scripts to execution location inside newly created dir
   # only necessary if its not an executable (python is executable here)
   uploader.attach_generator_files(to_copy=["./tf/data_uploader.py"])
   experiment.generate(uploader, overwrite=True)
   return uploader

The last component of our workflow is the trainer. This process will keep downloading samples as they are produced, and use them to train a neural network.

Here is the content of the `training_service.py` script, stored in the `tf` directory

```python
import tensorflow.keras as keras

from smartsim.ml.tf import DynamicDataGenerator

training_generator = DynamicDataGenerator(cluster=False, verbose=True)
model = keras.applications.MobileNetV2(weights=None, classes=training_generator.num_classes)
model.compile(optimizer="Adam", loss="mse", metrics=["mae"])

print("Starting training")

for epoch in range(100):
    print(f"Epoch {epoch}")
    model.fit(training_generator, steps_per_epoch=None, 
              epochs=epoch+1, initial_epoch=epoch, batch_size=training_generator.batch_size,
              verbose=2)
```

The key component is the `DynamicDataGenerator` object. Since the DB contains the metadata of the `TrainingDataUploader`, and these are stored under a default key, the `DynamicDataGenerator` will retrieve them and use them to know the key of the uploaded batches of samples and labels. We create a SmartSim entity representing the trainer and we are ready to go!


In [None]:
def create_trainer(experiment, alloc):
    """Start a process running a training service which will
       download batches from the DB.
    """
    srun = SrunSettings(exe="python",
                        exe_args="training_service.py",
                        env_vars={"PYTHONUNBUFFERED": "1"},
                        alloc=alloc)
    srun.set_tasks(1)

    trainer = experiment.create_model("trainer", srun)

    # create directories for the output files and copy
    # scripts to execution location inside newly created dir
    # only necessary if its not an executable (python is executable here)
    trainer.attach_generator_files(to_copy="./tf/training_service.py")
    experiment.generate(trainer, overwrite=True)
    return trainer

### 1.2 Request an allocation

We need one node for the DB, two for the producer ensemble, and one for the trainer, thus we request 4 nodes to be allocated.

In [None]:
alloc = slurm.get_allocation(nodes=4, time="03:00:00", options={"constraint": "V100", "partition": "spider"})

### 1.3 Run the workflow

Now that all components are available, we create the SmartSim experiment representing our workflow. 
Notice that the line
```python
uploader_model.enable_key_prefixing()
```
sets the ``uploader`` process so that tensor keys produced within it will be prefixed with its
SmartSim entity name, and the lines
```python
for uploader in uploader_model.entities:
    trainer_model.register_incoming_entity(uploader)
```
make sure that each uploader replica (within the ensemble) is set as an incoming entity of the `trainer_model`. This will allow the `trainer_model` to know which processes are producing batches it will need to download.

We call `exp.start` and the workflow is launched! As the trainer was started with `verbose=True`, we can look at its output in `launch_streaming`: we will see that it keeps downloading batches at the end of each epoch, as expected. 

In [None]:
exp = Experiment("launch_streaming", launcher="slurm")

db_port = 6780

# start the database
db = launch_cluster_orc(exp, db_port, alloc)
uploader_model = create_uploader(exp, alloc, 1, 2)
uploader_model.enable_key_prefixing()
exp.start(uploader_model, block=False, summary=False)
trainer_model = create_trainer(exp, alloc)
for uploader in uploader_model.entities:
    trainer_model.register_incoming_entity(uploader)

exp.start(trainer_model, block=True, summary=False)

print(exp.summary())

In [None]:
exp.stop(db, uploader_model, trainer_model)

Finally, we release the allocation.

In [None]:
slurm.release_allocation(alloc)

## 2 Second scenario: an ensemble of parallel producers and a distributed trainer

In the second scenario, we use Horovod to distribute the training and use multiple ranks. Each rank will download only a portion of the dataset, thus speeding up the download and training process.


### 2.1 Workflow components
The only component we change with respect to the previous example, is the training service, which is now defined in `training_service_hvd.py`:

```python
import numpy as np
import tensorflow.keras as keras
import tensorflow as tf

from smartsim.ml.tf import DynamicDataGenerator

import horovod.tensorflow.keras as hvd

# HVD initialization
hvd.init()
hvd_rank = hvd.rank()
hvd_size = hvd.size()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')


training_generator = DynamicDataGenerator(cluster=False,
                                          replica_rank=hvd_rank, 
                                          num_replicas=hvd_size,
                                          verbose=True)
model = keras.applications.MobileNetV2(weights=None, classes=training_generator.num_classes)

opt = keras.optimizers.Adam(0.0001*hvd_size)
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)
model.compile(optimizer=opt, loss="mse", metrics=["mae"])
callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]

print("Starting training")

for epoch in range(100):
    model.fit(training_generator, steps_per_epoch=None, callbacks=callbacks,
              epochs=epoch+1, initial_epoch=epoch, batch_size=training_generator.batch_size,
              verbose=2 if hvd_rank==0 else 0)

```

Compared to the previous training service, we notice that the only thing changing for the `DataGenerator` is that we need to specify the total number of replicas, which is equal to the total number of Horovod ranks, and the rank of each replica, which corresponds to the Horovod rank. The rest of the training service script is modified according to the standard Horovod distributed training rules.

Let's define the SmartSim entity representing the training service.

In [None]:
def create_trainer_hvd(experiment, alloc, nodes=1, tasks_per_node=1):
    """Start a process running a distributed training service which will
       download batches from the DB and use Horovod to distribute
       data and compute global weight updates.
    """
    srun = SrunSettings(exe="python",
                        exe_args="training_service_hvd.py",
                        env_vars={"PYTHONUNBUFFERED": "1"},
                        alloc=alloc)
    srun.set_nodes(nodes)
    srun.set_tasks_per_node(tasks_per_node)

    trainer = experiment.create_model("trainer", srun)

    # create directories for the output files and copy
    # scripts to execution location inside newly created dir
    # only necessary if its not an executable (python is executable here)
    trainer.attach_generator_files(to_copy="./tf/training_service_hvd.py")
    experiment.generate(trainer, overwrite=True)
    return trainer

### 2.2 Request an allocation

We need one node for the DB, two for the producer ensemble, and one for the trainer, thus we request 4 nodes to be allocated.

In [None]:
alloc = slurm.get_allocation(nodes=4, time="03:00:00", options={"constraint": "V100", "partition": "spider"})

### 2.3 Run the workflow

Now that all components are available, we create the SmartSim experiment representing our workflow. The setup is completely identical to the previous example, thus we can start the experiment and look at the output files to see that now each replica has a smaller portion of the dataset, and the training proceeds much faster!

In [None]:
exp = Experiment("launch_streaming_hvd", launcher="slurm")

db_port = 6780

# start the database
db = launch_cluster_orc(exp, db_port, alloc)
uploader_model = create_uploader(exp, alloc, 1, 8)
uploader_model.enable_key_prefixing()
exp.start(uploader_model, block=False, summary=False)
trainer_model = create_trainer_hvd(exp, alloc, 1, 8)
for uploader in uploader_model.entities:
    trainer_model.register_incoming_entity(uploader)

exp.start(trainer_model, block=True, summary=False)

print(exp.summary())

### 2.4 Stop the workflow (optional) and release the allocation
If we did not wait until completion of the previous cell, but we stopped it, we need to stop the SmartSim entities. 

In [None]:
exp.stop(db, uploader_model, trainer_model)

Finally, we release the allocation.

In [None]:
slurm.release_allocation(alloc)