# Train ResNet via Keras with Horovod on DSVM


## Introduction

This recipe shows how to run Keras using Batch AI on the Data Science VM base image with Horovod to perform data-parallel training across multiple GPUs and/or nodes.

## Details

- DSVM has Keras framework preinstalled;
- We use a tensorflow container
- The training script is a simple stacked LSTM generative model trained on Jane Austin's wonderful book Pride and Prejudice
- Standard output of the job will be stored on an Azure File Share.

## Instructions

### Install Dependencies and Create Configuration file.
Follow [instructions](https://github.com/Azure/BatchAI/tree/master/recipes) to install all dependencies and create configuration file. There is a config file template included in this repo.

### Read Configuration and Create Batch AI client

In [None]:
from __future__ import print_function

from datetime import datetime
import sys

from azure.storage.file import FileService
import azure.mgmt.batchai.models as models

# utilities.py contains helper functions used by different notebooks
import utilities

cfg = utilities.Configuration('configuration.json')
client = utilities.create_batchai_client(cfg)

### Create File Share

For this example we will create a new File Share with name `batchaisamplehorovod` under your storage account.

**Note** You don't need to create new file share for every cluster. We are doing this in this sample to simplify resource management for you.

In [None]:
azure_file_share_name = 'batchaisamplehorovod'
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_share(azure_file_share_name, fail_on_exist=False)
print('Done')

### Configure Compute Cluster

- For this example we will use a gpu cluster of `STANDARD_NC12` nodes. Number of nodes in the cluster is configured with `nodes_count` variable; We configure one. So, this will be a single machine with 2 GPUs. This is the lowest cost (standard) config for testing Horovod on Batch AI.
- We will mount file share at folder with name `external`. Full path of this folder on a computer node will be `$AZ_BATCHAI_MOUNT_ROOT/external`;
- We will call the cluster `nc12horovod`;


So, the cluster will have the following parameters:

In [None]:
azure_file_share_mount_path = 'external'
nodes_count = 1
cluster_name = 'nc12horovod'

volumes = models.MountVolumes(
    azure_file_shares=[
        models.AzureFileShareReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            azure_file_url = 'https://{0}.file.core.windows.net/{1}'.format(
                cfg.storage_account_name, azure_file_share_name),
            relative_mount_path=azure_file_share_mount_path)
    ]
)

parameters = models.ClusterCreateParameters(
    location=cfg.location,
    vm_size="STANDARD_NC12",
    virtual_machine_configuration=models.VirtualMachineConfiguration(
        image_reference=models.ImageReference(
            publisher="microsoft-ads",
            offer="linux-data-science-vm-ubuntu",
            sku="linuxdsvmubuntu",
            version="latest")),
    scale_settings=models.ScaleSettings(
        manual=models.ManualScaleSettings(target_node_count=nodes_count)
    ),
    node_setup=models.NodeSetup(
        mount_volumes=volumes
    ),
    user_account_settings=models.UserAccountSettings(
        admin_user_name=cfg.admin,
        admin_user_password=cfg.admin_password,
        admin_user_ssh_public_key=cfg.admin_ssh_key
    )
)

### Create Compute Cluster

In [None]:
cluster = client.clusters.create(cfg.resource_group, cluster_name, parameters).result()

### Monitor Cluster Creation

utilities.py contains a helper function allowing to wait for the cluster to become available - all nodes are allocated and finished preparation.

In [None]:
cluster = client.clusters.get(cfg.resource_group, cluster_name)
utilities.print_cluster_status(cluster)

### Deploy Sample Script and Configure the Input Directories


In [None]:
keras_sample_dir = "KerasSamples"
file_name='keras-lstm-horovod.py'
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_directory(
    azure_file_share_name, keras_sample_dir, fail_on_exist=False)

#Keras program file
service.create_file_from_path(
    azure_file_share_name, keras_sample_dir, file_name, file_name)

print('Done')

In [None]:
input_directories = [
    models.InputDirectory(
        id='SCRIPT',
        path='$AZ_BATCHAI_MOUNT_ROOT/{0}/{1}'.format(azure_file_share_mount_path, keras_sample_dir))
]
print(input_directories[0])

The job will be able to reference those directories using the ```$AZ_BATCHAI_INPUT_SCRIPT``` environment variable.

### Configure Output Directories
We will store standard and error output of the job in File Share:

In [None]:
std_output_path_prefix = "$AZ_BATCHAI_MOUNT_ROOT/{0}".format(azure_file_share_mount_path)

### Configure Job

- Will use configured previously input and output directories;
- Will run standard `keras-lstm-horovod.py` from the SCRIPT input directory using custom framework; Easy enough to try different scripts.
- Keras will obviously need to use the Tensorflow backend
- Will output standard output and error streams to file share.
- We are using MPI over TCP here because we are only on NC12 nodes and so do not have RDMA available. See this [recipe](https://github.com/Azure/BatchAI/tree/master/recipes/Horovod/Horovod-Infiniband-Benchmark) for RDMA configuration. 


In [None]:
job_name = datetime.utcnow().strftime("keras_%H%M%S")
parameters = models.job_create_parameters.JobCreateParameters(
     location=cfg.location,
     cluster=models.ResourceId(cluster.id),
     node_count=1,
     input_directories=input_directories,
     std_out_err_path_prefix=std_output_path_prefix,
     container_settings=models.ContainerSettings(
         models.ImageSourceRegistry(image='tensorflow/tensorflow:1.1.0-gpu')),
     job_preparation=models.JobPreparation(
         command_line="conda update --all; apt update; apt install mpi-default-dev mpi-default-bin -y; pip install horovod; pip install keras;KERAS_BACKEND=tensorflow; pip install h5py;ls /mnt/batch/tasks/shared/LS_root/mounts/external/KerasSamples/"),
     custom_toolkit_settings = models.CustomToolkitSettings(
         command_line='mpirun -mca orte_base_help_aggregate 0 -mca btl_tcp_if_exclude docker0,lo --allow-run-as-root --hostfile $AZ_BATCHAI_MPI_HOST_FILE python /mnt/batch/tasks/shared/LS_root/mounts/external/KerasSamples/keras-lstm-horovod.py'))
         #command_line='KERAS_BACKEND=tensorflow python $AZ_BATCHAI_INPUT_SCRIPT/keras-lstm-horovod.py'))

### Create a training Job and wait for Job completion


In [None]:
job = client.jobs.create(cfg.resource_group, job_name, parameters).result()
print('Created Job: {}'.format(job_name))

### Wait for Job to Finish
The job will start running when the cluster will have enought idle nodes. The following code waits for job to start running printing the cluster state. During job run, the code prints current content of stdout.txt.

**Note** Execution may take several minutes to complete.

In [None]:
utilities.wait_for_job_completion(client, cfg.resource_group, job_name, cluster_name, 'stdouterr', 'stdout.txt')

### Download stdout.txt and stderr.txt files for the Job

In [None]:
files = client.jobs.list_output_files(cfg.resource_group, job_name, models.JobsListOutputFilesOptions("stdOuterr")) 
for f in list(files):
    utilities.download_file(f.download_url, f.name)
print("All files downloaded")

In [None]:
print('stdout.txt content:')
with open('stdout.txt') as f:
    print(f.read())

### Delete the Job

In [None]:
_ = client.jobs.delete(cfg.resource_group, job_name)

### Delete the Cluster
When you are finished with the sample and don't want to submit any more jobs you can delete the cluster using the following code.

In [None]:
sample_ = client.clusters.delete(cfg.resource_group, cluster_name)

### Delete File Share
When you are finished with the sample and don't want to submit any more jobs you can delete the file share completely with all files using the following code.

In [None]:
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.delete_share(azure_file_share_name)