# Tensorflow + Horovod + Keras + Azure Batch AI


## Introduction

This recipe shows how to run [Horovod](https://github.com/uber/horovod) distributed training framework using Batch AI.

Currently Batch AI has no native support for Horovod framework, but it's easy to run it using customtoolkit and job preparation command line.


## Details

- Standard Horovod [tensorflow_mnist.py](https://github.com/uber/horovod/blob/v0.9.10/examples/tensorflow_mnist.py) example will be used;
- tensorflow_mnist.py downloads training data on its own during execution;
- The job will be run on standard tensorflow container tensorflow/tensorflow:1.1.0-gpu;
- Horovod framework will be installed in the container using job preparation command line. Note, you can build your own docker image containing tensorflow and horovod instead.
- Standard output of the job will be stored on Azure File Share.

## Instructions

### Install Dependencies and Create Configuration file.
Follow [instructions](/recipes) to install all dependencies and create configuration file.

### Read Configuration and Create Batch AI client

In [2]:
from __future__ import print_function

import time
from datetime import datetime
import os
import sys
import zipfile

from azure.storage.file import FileService, FilePermissions
import azure.mgmt.batchai.models as models

# utilities.py contains helper functions used by different notebooks
sys.path.append('./')
import utilities

cfg = utilities.Configuration('configuration.json')
client = utilities.create_batchai_client(cfg)

### Create File Share

For this example we will create a new File Share with name `batchaisample` under your storage account. This share will be populated with sample scripts and will contain job's output.

**Note** You don't need to create new file share for every cluster. We are doing this in this sample to simplify resource management for you.

In [3]:
azure_file_share_name = 'batchai35'
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_share(azure_file_share_name, fail_on_exist=False)

True

### Configure Compute Cluster

- For this example we will use a gpu cluster of `STANDARD_NC6` nodes. Number of nodes in the cluster is configured with `nodes_count` variable;
- We will mount file share at folder with name `external`. Full path of this folder on a computer node will be `$AZ_BATCHAI_MOUNT_ROOT/external`;
- We will call the cluster `nc6`.


So, the cluster will have the following parameters:

In [4]:
azure_file_share = 'external'
nodes_count = 2
cluster_name = 'nc6'

volumes = models.MountVolumes(
    azure_file_shares=[
        models.AzureFileShareReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            azure_file_url = 'https://{0}.file.core.windows.net/{1}'.format(
                cfg.storage_account_name, azure_file_share_name),
            relative_mount_path=azure_file_share)
    ]
)

parameters = models.ClusterCreateParameters(
    location=cfg.location,
    vm_size="STANDARD_NC6",
    virtual_machine_configuration=models.VirtualMachineConfiguration(
        image_reference=models.ImageReference(
            publisher="microsoft-ads",
            offer="linux-data-science-vm-ubuntu",
            sku="linuxdsvmubuntu",
            version="latest")),
    scale_settings=models.ScaleSettings(
        manual=models.ManualScaleSettings(target_node_count=nodes_count)
    ),
    node_setup=models.NodeSetup(
        mount_volumes=volumes
    ),
    user_account_settings=models.UserAccountSettings(
        admin_user_name=cfg.admin,
        admin_user_password=cfg.admin_password,
        admin_user_ssh_public_key=cfg.admin_ssh_key
    )
)

### Create Compute Cluster

In [5]:
_ = client.clusters.create(cfg.resource_group, cluster_name, parameters)

### Monitor Cluster Creation

utilities.py contains a helper function allowing to wait for the cluster to become available - all nodes are allocated and finished preparation.

In [7]:
cluster = client.clusters.get(cfg.resource_group, cluster_name)
utilities.print_cluster_status(cluster)

Cluster state: AllocationState.steady Target: 2; Allocated: 2; Idle: 2; Unusable: 0; Running: 0; Preparing: 0; Leaving: 0


### Deploy Sample Script and Configure the Input Directories


- Create a folder in the file share and upload the sample script to it.

In [8]:
script_directory = 'trainall_script'
#script_file = 'GloveBidirectionalLSTM_v3_1_2host.py'
script_file = 'BidirectionalLSTM_v2_1_2host.py'

#dataset_file1 = 'glove.6B.100d.txt'
#dataset_file2 = 'labeledTrainData.tsv'

service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.create_directory(
    azure_file_share_name, script_directory, fail_on_exist=False)

#Script file
service.create_file_from_path(
    azure_file_share_name, script_directory, script_file, script_file)

#Dataset file
#service.create_file_from_path(
#    azure_file_share_name, script_directory, dataset_file1, dataset_file1)
#service.create_file_from_path(
#    azure_file_share_name, script_directory, dataset_file2, dataset_file2)

Client-Request-ID=3aa14334-02b8-11e8-b1eb-5c514f38ccef Retry policy did not allow for a retry: Server-Timestamp=Fri, 26 Jan 2018 16:44:53 GMT, Server-Request-ID=eab51833-001a-012e-0fc4-962021000000, HTTP status code=409, Exception=The specified resource already exists.<?xml version="1.0" encoding="utf-8"?><Error><Code>ResourceAlreadyExists</Code><Message>The specified resource already exists.RequestId:eab51833-001a-012e-0fc4-962021000000Time:2018-01-26T16:44:54.0732401Z</Message></Error>.


- The job needs to know where to find train_mnist.py script (the chainer will download MNIST dataset on its own). So, we will configure an input directory for the script:

In [9]:
input_directories = [
    models.InputDirectory(
        id='SCRIPTS',
        path='$AZ_BATCHAI_MOUNT_ROOT/{0}/{1}'.format(azure_file_share, script_directory))
]

The job will be able to reference those directories using ```$AZ_BATCHAI_INPUT_SCRIPT``` environment variable.

### Configure Output Directories
We will store standard and error output of the job in File Share:

In [10]:
std_output_path_prefix = "$AZ_BATCHAI_MOUNT_ROOT/{0}".format(azure_file_share)

### Configure Job

- Will use configured previously input and output directories;
- We will use custom toolkit job to run tensorflow_mnist.py on multiple nodes (use node_count parameter to specify number of nodes). Note, Batch AI will create a hostfile for the job, it can be found via ```$AZ_BATCHAI_MPI_HOST_FILE``` environment variable;
- Horovod framework will be installed by job preparation command line;
- Will output standard output and error streams to file share.

You can delete ```container_settings``` from the job definition to run the job directly on host DSVM.

In [11]:
job_name = datetime.utcnow().strftime("horovod_keras_%m_%d_%Y_%H%M%S")
parameters = models.job_create_parameters.JobCreateParameters(
     location=cfg.location,
     cluster=models.ResourceId(cluster.id),
     node_count=nodes_count,
     input_directories=input_directories,
     std_out_err_path_prefix=std_output_path_prefix,
     container_settings=models.ContainerSettings(
         models.ImageSourceRegistry(image='tensorflow/tensorflow:1.4.0-gpu')),
     job_preparation=models.JobPreparation(
         command_line="apt update; apt install mpi-default-dev mpi-default-bin -y; pip install horovod; pip install keras; pip install bs4"),
     custom_toolkit_settings = models.CustomToolkitSettings(
         command_line='mpirun -mca btl_tcp_if_exclude docker0,lo --allow-run-as-root --hostfile $AZ_BATCHAI_MPI_HOST_FILE python $AZ_BATCHAI_INPUT_SCRIPTS/'+script_file))

### Create a training Job and wait for Job completion


In [12]:
_ = client.jobs.create(cfg.resource_group, job_name, parameters)
print('Created Job: {}'.format(job_name))

Created Job: horovod_keras_01_26_2018_164508


### Wait for Job to Finish
The job will start running when the cluster will have enought idle nodes. The following code waits for job to start running printing the cluster state. During job run, the code prints current content of stderr.txt.

**Note** Execution may take several minutes to complete.

In [13]:
utilities.wait_for_job_completion(client, cfg.resource_group, job_name, cluster_name, 'stdouterr', 'stderr.txt')

Cluster state: AllocationState.steady Target: 2; Allocated: 2; Idle: 2; Unusable: 0; Running: 0; Preparing: 0; Leaving: 0
Job state: running ExitCode: None
Waiting for job output to become available...
Unexpected end of /proc/mounts line `overlay / overlay rw,relatime,lowerdir=/data/docker/overlay2/l/JSVFG6PRVBIRXJZ7ER5G4ZRWCM:/data/docker/overlay2/l/MTHYOXWRTP6JWSQLKBJDX6ZCG4:/data/docker/overlay2/l/BYWBM3MZXE3PEGJH2ROOOIEIAO:/data/docker/overlay2/l/EZVHUJZRPK46CSLFTFCFHU4K4D:/data/docker/overlay2/l/PMVJST7AYQ7TYB3RC7S7ECQB2E:/data/docker/overlay2/l/N6AEVBXOIEFOFGRJTKGZWJSXSL:/data/docker/overlay2/l/CPZIUJO6VVPA7W2DJM3TT6FSEY:/data/docker/overlay2/l/WUEAYMSG4V5QBLYLLDS75TD2QV:/data/docker/overlay2/l/DT33OWB23UIVK7C64UFZETWM3L:/data/docker/'
Unexpected end of /proc/mounts line `overlay2/l/GGUVLW3G546SWQONNP5UPQGRFG:/data/docker/overlay2/l/FLLABMSE4XTXRZA65536CHOYRV:/data/docker/overlay2/l/7RXOBFEMW5Q55W5NT26DISZADX:/data/docker/overlay2/l/UVZL675ITV42SVPFFYOEIBVWIN:/data/docker/overlay

### Download stdout.txt and stderr.txt files for the Job and job preparation command

In [14]:
files = client.jobs.list_output_files(cfg.resource_group, job_name, models.JobsListOutputFilesOptions("stdOuterr")) 
for file in list(files):
    utilities.download_file(file.download_url, file.name)
print("All files Downloaded")

Downloading https://aitrainigstorage.file.core.windows.net/batchai35/1aa15964-43e9-4fab-9be5-81abdcb9c8d1/holkrazure/jobs/horovod_keras_01_26_2018_164508/1c0b7739-f8de-4f22-9880-a6e0219adaaf/stderr-job_prep-tvm-1392786932_1-20180126t163607z.txt?sv=2016-05-31&sr=f&sig=XmDM0gQ0zwnwbjwOnjf86sOSIpO5uxH5dWKg7tFByu4%3D&se=2018-01-26T18%3A15%3A33Z&sp=rl ...Done
Downloading https://aitrainigstorage.file.core.windows.net/batchai35/1aa15964-43e9-4fab-9be5-81abdcb9c8d1/holkrazure/jobs/horovod_keras_01_26_2018_164508/1c0b7739-f8de-4f22-9880-a6e0219adaaf/stderr-job_prep-tvm-1392786932_2-20180126t163607z.txt?sv=2016-05-31&sr=f&sig=Z4WB7%2FBgiMH%2BDG5z85fBeZUzxKFN3GUrzBpjP6K0ko8%3D&se=2018-01-26T18%3A15%3A33Z&sp=rl ...Done
Downloading https://aitrainigstorage.file.core.windows.net/batchai35/1aa15964-43e9-4fab-9be5-81abdcb9c8d1/holkrazure/jobs/horovod_keras_01_26_2018_164508/1c0b7739-f8de-4f22-9880-a6e0219adaaf/stderr.txt?sv=2016-05-31&sr=f&sig=Uv%2FSXuJOHpbrq3SM8%2By4XRESng7JyYiWMOkVTciMfO4%3D&se=201

In [16]:
print('stdout.txt content:')
with open('stdout.txt') as f:
    print(f.read())

print('stderr.txt content:')
with open('stderr.txt') as f:
    print(f.read())

stdout.txt content:
--------------------------------------------------------------------------
[[13183,1],1]: A high-performance Open MPI point-to-point messaging module
was unable to find any relevant network interfaces:

Module: OpenFabrics (openib)
  Host: d7a020c29eb74b5aa6c9b6c5b2279fc6000001

Another transport will be used instead, although this may result in
lower performance.
--------------------------------------------------------------------------
Loading data...
Downloading data from https://s3.amazonaws.com/text-datasets/imdb.npz
Loading data...
Downloading data from https://s3.amazonaws.com/text-datasets/imdb.npz
Loading data...
Downloading data from https://s3.amazonaws.com/text-datasets/imdb.npz
Loading data...
Downloading data from https://s3.amazonaws.com/text-datasets/imdb.npz

   16384/17464789 [..............................] - ETA: 3s
   16384/17464789 [..............................] - ETA: 0s
   16384/17464789 [..............................] - ETA: 0s

### Delete the Job

In [30]:
client.jobs.delete(cfg.resource_group, job_name)

<msrestazure.azure_operation.AzureOperationPoller at 0x7fad4f2547f0>

### Delete the Cluster
When you are finished with the sample and don't want to submit any more jobs you can delete the cluster using the following code.

In [31]:
client.clusters.delete(cfg.resource_group, cluster_name)

<msrestazure.azure_operation.AzureOperationPoller at 0x7fad33a4d7b8>

### Delete File Share
When you are finished with the sample and don't want to submit any more jobs you can delete the file share completely with all files using the following code.

In [32]:
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.delete_share(azure_file_share_name)

True