# VIII - Parallel and Distributed Execution
In this notebook, we will execute training across multiple nodes (or in parallel across a single node over multiple GPUs). We will train an image classification model with Resnet20 on the CIFAR-10 data set across multiple nodes in this notebook.

Azure Batch and Batch Shipyard have the ability to perform "gang scheduling" or scheduling multiple nodes for a single task. This is most commonly used for Message Passing Interface (MPI) jobs.

* [Setup](#section1)
* [Configure and Submit MPI Job and Submit](#section2)
* [Delete Multi-Instance Job](#section3)

<a id='section1'></a>

## Setup

Create a simple alias for Batch Shipyard

In [20]:
%alias shipyard SHIPYARD_CONFIGDIR=config python $HOME/batch-shipyard/shipyard.py %l

Check that everything is working

In [21]:
shipyard

Usage: shipyard.py [OPTIONS] COMMAND [ARGS]...

  Batch Shipyard: Provision and Execute Docker Workloads on Azure Batch

Options:
  --version   Show the version and exit.
  -h, --help  Show this message and exit.

Commands:
  cert      Certificate actions
  data      Data actions
  fs        Filesystem in Azure actions
  jobs      Jobs actions
  keyvault  KeyVault actions
  misc      Miscellaneous actions
  pool      Pool actions
  storage   Storage actions


Let's first delete the pool used in the non-advanced notebooks and wait for it to be removed so we can free up our core quota. We need to create a new pool with different settings and Docker image.

In [22]:
shipyard pool del -y --wait

2017-06-21 15:30:39,661 INFO - Deleting pool: gpupool
2017-06-21 15:30:39,902 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool): shipyardregistry
2017-06-21 15:30:40,249 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool): shipyardgr
2017-06-21 15:30:40,347 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool): shipyardperf
2017-06-21 15:30:40,398 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool): shipyarddht
2017-06-21 15:30:40,451 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool): shipyardimages
2017-06-21 15:30:40,552 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool): shipyardtorrentinfo
2017-06-21 15:30:40,604 DEBUG - deleting queue: shipyardgr-batcha3dc41fdba-gpupool
2017-06-21 15:30:40,825 DEBUG - deleting container: shipyardtor-batcha3dc41fdba-gpupool
2017-06-21 15:30:40,995 DEBUG - deleting container: shipyardrf-batcha3dc41fdba-gpupool
2017-06-21 15:30:41,036 DEBUG - waiting for pool gpupool to delete


Read in the account information we saved earlier

In [23]:
import json
import os

def read_json(filename):
    with open(filename, 'r') as infile:
        return json.load(infile)
    
def write_json_to_file(json_dict, filename):
    """ Simple function to write JSON dictionaries to files
    """
    with open(filename, 'w') as outfile:
        json.dump(json_dict, outfile)

account_info = read_json('account_information.json')

storage_account_key = account_info['storage_account_key']
storage_account_name = account_info['storage_account_name']
STORAGE_ALIAS = account_info['STORAGE_ALIAS']

Create the `resource_files` to randomly download train and test data for CNTK

In [24]:
import random

IMAGE_NAME = 'alfpark/cntk:2.0.rc2-gpu-1bit-sgd-python3.5-cuda8.0-cudnn5.1'

Create data set conversion scripts to be uploaded. On real production runs, we would already have this data pre-converted instead of converting at the time of node startup.

Additionally, we'll create an MPI helper script for executing the MPI job. This helper script does the following:
1. Ensures that there are GPUs available to execute the task.
2. Parses the `$AZ_BATCH_HOST_LIST` for all of the hosts participating in the MPI job and creates a `hostfile` from it
3. Computes the total number of slots (processors)
4. Sets the proper CNTK training directory, script and options
5. Executes the MPI job via `mpirun`

In [25]:
%%writefile run_cifar10.sh
#!/usr/bin/env bash

set -e
set -o pipefail

# get number of GPUs on machine
ngpus=$(nvidia-smi -L | wc -l)
echo "num gpus: $ngpus"

if [ $ngpus -eq 0 ]; then
    echo "No GPUs detected."
    exit 1
fi

# get number of nodes
IFS=',' read -ra HOSTS <<< "$AZ_BATCH_HOST_LIST"
nodes=${#HOSTS[@]}

# create hostfile
touch hostfile
>| hostfile
for node in "${HOSTS[@]}"
do
    echo $node slots=$ngpus max-slots=$ngpus >> hostfile
done

# compute number of processors
np=$(($nodes * $ngpus))

# print configuration
echo "num nodes: $nodes"
echo "hosts: ${HOSTS[@]}"
echo "num mpi processes: $np"

# set cntk related vars
trainscript=$AZ_BATCH_TASK_SHARED_DIR/TrainResNet_CIFAR10.py

# set training options
trainopts="--datadir $AZ_BATCH_NODE_SHARED_DIR/data --modeldir $AZ_BATCH_TASK_WORKING_DIR/output --network resnet20 --distributed True -q 1 -a 0"

# execute mpi job
/root/openmpi/bin/mpirun --allow-run-as-root --mca btl_tcp_if_exclude docker0 \
    -np $np --hostfile hostfile -x LD_LIBRARY_PATH\
    /bin/bash -c "source /cntk/activate-cntk; python -u $trainscript $trainopts $*"

Writing run_cifar10.sh


Move the files into a directory to be uploaded.

In [26]:
INPUT_CONTAINER = 'input-dist'
UPLOAD_DIR = 'dist_upload'

!rm -rf $UPLOAD_DIR
!mkdir -p $UPLOAD_DIR
!mv run_cifar10.sh $UPLOAD_DIR
!ls -alF $UPLOAD_DIR

total 12
drwxr-xr-x  2 nbuser nbuser 4096 Jun 21 15:35 ./
drwx------ 19 nbuser nbuser 4096 Jun 21 15:35 ../
-rw-r--r--  1 nbuser nbuser 1080 Jun 21 15:35 run_cifar10.sh


We will create the config structure to directly reference these files to ingress into Azure Storage. This obviates the need to call `blobxfer` as it will be done for us during pool creation.

In [27]:
config = {
    "batch_shipyard": {
        "storage_account_settings": STORAGE_ALIAS
    },
    "global_resources": {
        "docker_images": [
            IMAGE_NAME
        ],
        "files": [
            {
                "source": {
                    "path": UPLOAD_DIR
                },
                "destination": {
                    "storage_account_settings": STORAGE_ALIAS,
                    "data_transfer": {
                        "container": INPUT_CONTAINER
                    }
                }
            }
        ]
    }
}

Now we'll create the pool specification with a few modifications for this particular execution:
- `inter_node_communication_enabled` will ensure nodes are allocated such that they can communicate with each other (e.g., send and receive network packets)
- `input_data` specifies the scripts we created above to be downloaded into `$AZ_BATCH_NODE_SHARED_DIR/cifar10_data`
- `transfer_files_on_pool_creation` will transfer the `files` specified in `global_resources` to be transferred during pool creation (i.e., `pool add`)
- `resource_files` are the CNTK train and test data files
- `additional_node_prep_commands` are commands to execute for node preparation of all compute nodes. Our additional node prep command is to execute the conversion script we created in an earlier step above

**Note:** Most often it is better to scale up the execution first, prior to scale out. Due to our default core quota of just 20 cores, we are using 3 `STANDARD_NC6` nodes. In real production runs, we'd most likely scale up to multiple GPUs within a single node (parallel execution) such as `STANDARD_NC12` or `STANDARD_NC24` prior to scaling out to multiple NC nodes (parallel and distributed execution).

In [29]:
POOL_ID = 'gpupool-multi-instance'

pool = {
    "pool_specification": {
        "id": POOL_ID,
        "vm_size": "STANDARD_NC6",
        "vm_count": {
            "dedicated": 3
        },
        "publisher": "Canonical",
        "offer": "UbuntuServer",
        "sku": "16.04-LTS",
        "ssh": {
            "username": "docker"
        },
        "inter_node_communication_enabled": True,
        "reboot_on_start_task_failed": False,
        "block_until_all_global_resources_loaded": True,
        "transfer_files_on_pool_creation": True,
        "input_data": {
            "azure_storage": [
                {
                    "storage_account_settings": STORAGE_ALIAS,
                    "container": INPUT_CONTAINER,
                    "destination": "$AZ_BATCH_NODE_SHARED_DIR"
                }
            ]
        },
        "resource_files": [
            {
                "file_path": "cifar_data_processing.py",
                "blob_source": "https://batchshipyardexamples.blob.core.windows.net/code/cifar_data_processing.py",
                "file_mode":'0777'
            },
            {
                "file_path": "convert_cifar10.sh",
                "blob_source": "https://batchshipyardexamples.blob.core.windows.net/code/convert_cifar10.sh",
                "file_mode":'0777'
            }
        ],
         "additional_node_prep_commands": [
            "/bin/bash convert_cifar10.sh {} $AZ_BATCH_NODE_SHARED_DIR/data".format(IMAGE_NAME),
             "chmod 777 $AZ_BATCH_NODE_SHARED_DIR/run_cifar10.sh"
        ]
    }
}

In [30]:
!mkdir config
write_json_to_file(config, os.path.join('config', 'config.json'))
write_json_to_file(pool, os.path.join('config', 'pool.json'))
print(json.dumps(config, indent=4, sort_keys=True))
print(json.dumps(pool, indent=4, sort_keys=True))

mkdir: cannot create directory 'config': File exists
{
    "batch_shipyard": {
        "storage_account_settings": "mystorageaccount"
    }, 
    "global_resources": {
        "docker_images": [
            "alfpark/cntk:2.0.rc2-gpu-1bit-sgd-python3.5-cuda8.0-cudnn5.1"
        ], 
        "files": [
            {
                "destination": {
                    "data_transfer": {
                        "container": "input-dist"
                    }, 
                    "storage_account_settings": "mystorageaccount"
                }, 
                "source": {
                    "path": "dist_upload"
                }
            }
        ]
    }
}
{
    "pool_specification": {
        "additional_node_prep_commands": [
            "/bin/bash convert_cifar10.sh alfpark/cntk:2.0.rc2-gpu-1bit-sgd-python3.5-cuda8.0-cudnn5.1 $AZ_BATCH_NODE_SHARED_DIR/data", 
            "chmod 777 $AZ_BATCH_NODE_SHARED_DIR/run_cifar10.sh"
        ], 
        "block_until_all_global_resources_loa

Create the pool, please be patient while the compute nodes are allocated.

In [None]:
shipyard pool add -y

2017-06-21 15:37:27,136 INFO - creating table: shipyardregistry
2017-06-21 15:37:27,338 INFO - creating container: shipyardremotefs
2017-06-21 15:37:27,555 INFO - creating table: shipyardgr
2017-06-21 15:37:27,605 INFO - creating table: shipyarddht
2017-06-21 15:37:27,662 INFO - creating table: shipyardimages
2017-06-21 15:37:27,713 INFO - creating queue: shipyardgr-batcha3dc41fdba-gpupool-multi-instance
2017-06-21 15:37:27,965 INFO - creating container: shipyardtor-batcha3dc41fdba-gpupool-multi-instance
2017-06-21 15:37:28,015 INFO - creating table: shipyardtorrentinfo
2017-06-21 15:37:28,062 INFO - creating container: shipyardrf-batcha3dc41fdba-gpupool-multi-instance
2017-06-21 15:37:28,117 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardregistry
2017-06-21 15:37:28,271 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardgr
2017-06-21 15:37:28,317 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardperf
2017-

Ensure that all compute nodes are `idle` and ready to accept tasks:

In [13]:
shipyard pool listnodes

2017-06-21 10:47:05,737 DEBUG - listing nodes for pool gpupool-multi-instance
2017-06-21 10:47:06,061 INFO - node_id=tvm-4283973576_1-20170621t103038z [state=ComputeNodeState.idle start_task_exit_code=0 scheduling_state=SchedulingState.enabled ip_address=10.0.0.5 vm_size=standard_nc6 dedicated=True total_tasks_run=0 running_tasks_count=0 total_tasks_succeeded=0]
2017-06-21 10:47:06,061 INFO - node_id=tvm-4283973576_2-20170621t103038z [state=ComputeNodeState.idle start_task_exit_code=0 scheduling_state=SchedulingState.enabled ip_address=10.0.0.4 vm_size=standard_nc6 dedicated=True total_tasks_run=0 running_tasks_count=0 total_tasks_succeeded=0]
2017-06-21 10:47:06,061 INFO - node_id=tvm-4283973576_3-20170621t103038z [state=ComputeNodeState.idle start_task_exit_code=0 scheduling_state=SchedulingState.enabled ip_address=10.0.0.6 vm_size=standard_nc6 dedicated=True total_tasks_run=0 running_tasks_count=0 total_tasks_succeeded=0]


<a id='section2'></a>

## Configure MPI Job and Submit

MPI jobs in Batch require execution as a multi-instance task. Essentially this allows multiple compute nodes to be used for a single task.

A few things to note in this jobs configuration:
- The `COMMAND` executes the `run_cifar10.sh` script that was uploaded earlier as part of the node preparation task.
- `auto_complete` is being set to `True` which forces the job to move from `active` to `completed` state once all tasks complete. Note that once a job has moved to `completed` state, no new tasks can be added to it.
- `multi_instance` property is populated which enables multiple nodes, e.g., `num_instances` to participate in the execution of this task. The `coordination_command` is the command that is run on all nodes prior to the `command`. Here, we are simply executing the Docker image to run the SSH server for the MPI daemon (e.g., orted, hydra, etc.) to initialize all of the nodes prior to running the application command.

In [14]:
JOB_ID = 'cntk-mpi-job'

# reduce the nubmer of epochs to 20 for purposes of this notebook
COMMAND = '$AZ_BATCH_NODE_SHARED_DIR/run_cifar10.sh -e 20'
jobs = {
    "job_specifications": [
        {
            "id": JOB_ID,
            "auto_complete": True,
            "tasks": [
                {
                    "image": IMAGE_NAME,
                    "remove_container_after_exit": True,
                    "command": COMMAND,
                    "gpu": True,
                    "multi_instance": {
                        "num_instances": "pool_current_dedicated",
                        "coordination_command": "/usr/sbin/sshd -D -p 23",
                        "resource_files": [
                            {
                                "file_path": "resnet_models.py",
                                "blob_source": "https://batchshipyardexamples.blob.core.windows.net/code/resnet_models.py",
                                "file_mode":'0777'
                            },
                            {
                                "file_path": "TrainResNet_CIFAR10.py",
                                "blob_source": "https://batchshipyardexamples.blob.core.windows.net/code/TrainResNet_CIFAR10.py",
                                "file_mode":'0777'
                            }
                        ],
                    },
                }
            ],
        }
    ]
}

In [15]:
write_json_to_file(jobs, os.path.join('config', 'jobs.json'))
print(json.dumps(jobs, indent=4, sort_keys=True))

{
    "job_specifications": [
        {
            "auto_complete": true, 
            "id": "cntk-mpi-job", 
            "tasks": [
                {
                    "command": "$AZ_BATCH_NODE_SHARED_DIR/run_cifar10.sh -e 20", 
                    "gpu": true, 
                    "image": "alfpark/cntk:2.0.rc2-gpu-1bit-sgd-python3.5-cuda8.0-cudnn5.1", 
                    "multi_instance": {
                        "coordination_command": "/usr/sbin/sshd -D -p 23", 
                        "num_instances": "pool_current_dedicated", 
                        "resource_files": [
                            {
                                "blob_source": "https://batchshipyardexamples.blob.core.windows.net/code/resnet_models.py", 
                                "file_mode": "0777", 
                                "file_path": "resnet_models.py"
                            }, 
                            {
                                "blob_source": "https://batchshipyardexampl

Submit the job and tail `stdout.txt`:

In [16]:
shipyard jobs add --tail stdout.txt

2017-06-21 10:47:16,151 INFO - Adding job cntk-mpi-job to pool gpupool-multi-instance
2017-06-21 10:47:16,743 INFO - uploading file /tmp/tmpsJeQ7e as u'shipyardtaskrf-cntk-mpi-job/dockertask-00000.shipyard.envlist'
2017-06-21 10:47:16,796 DEBUG - submitting 1 tasks (0 -> 0) to job cntk-mpi-job
2017-06-21 10:47:17,058 INFO - submitted all 1 tasks to job cntk-mpi-job
2017-06-21 10:47:17,298 DEBUG - attempting to stream file stdout.txt from job=cntk-mpi-job task=dockertask-00000
d8deefa2b42de464e3cf5857358f334ef51c97ffc26304487ffd9c1872f62d67
num gpus: 1
num nodes: 3
hosts: 10.0.0.4 10.0.0.5 10.0.0.6
num mpi processes: 3

************************************************************
CNTK is activated.

Please checkout tutorials and examples here:
  /cntk/Tutorials
  /cntk/Examples

To deactivate the environment run

  source /root/anaconda3/bin/deactivate

************************************************************

************************************************************
CNTK is acti

Using the command below we can check the status of our jobs. Once all jobs have an exit code we can continue. You can also view the **heatmap** of this pool on [Azure Portal](https://portal.azure.com) to monitor the progress of this job on the compute nodes under your Batch account.

<a id='section3'></a>

## Delete Multi-instance Job

Deleting multi-instance jobs running as Docker containers requires a little more care. We will need to first ensure that the job has entered `completed` state. In the above `jobs` configuration, we set `auto_complete` to `True` enabling the Batch service to automatically complete the job when all tasks finish. This also allows automatic cleanup of the running Docker containers used for executing the MPI job.

Special logic is required to cleanup MPI jobs since the `coordination_command` that runs actually detaches an SSH server. The job auto completion logic Batch Shipyard injects ensures that these containers are killed.

In [17]:
shipyard jobs listtasks

2017-06-21 10:56:31,116 INFO - job_id=cntk-mpi-job task_id=dockertask-00000 [state=TaskState.completed max_retries=0 retention_time=10675199 days, 2:48:05.477581 pool_id=gpupool-multi-instance node_id=tvm-4283973576_2-20170621t103038z start_time=2017-06-21 10:47:18.399507+00:00 end_time=2017-06-21 10:54:33.882116+00:00 duration=0:07:15.482609 exit_code=0]


Once we are sure that the job is completed, then we issue the standard delete command:

In [None]:
shipyard jobs del -y --termtasks --wait

2017-06-21 15:49:24,216 INFO - Deleting job: cntk-ps-as-job
2017-06-21 15:49:24,216 DEBUG - disabling job cntk-ps-as-job first due to task termination
2017-06-21 15:49:24,645 ERROR - cntk-ps-as-job job does not exist


In [None]:
shipyard pool del -y --wait

2017-06-21 15:49:26,330 INFO - Deleting pool: gpupool-multi-instance
2017-06-21 15:49:26,541 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardregistry
2017-06-21 15:49:26,886 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardgr
2017-06-21 15:49:26,990 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardperf
2017-06-21 15:49:27,046 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyarddht
2017-06-21 15:49:27,098 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardimages
2017-06-21 15:49:27,204 DEBUG - clearing table (pk=batcha3dc41fdba$gpupool-multi-instance): shipyardtorrentinfo
2017-06-21 15:49:27,256 DEBUG - deleting queue: shipyardgr-batcha3dc41fdba-gpupool-multi-instance
2017-06-21 15:49:27,483 DEBUG - deleting container: shipyardtor-batcha3dc41fdba-gpupool-multi-instance
2017-06-21 15:49:28,064 DEBUG - deleting container: shipyardrf-batcha3dc41fdba-gpupool-mul

## Next Steps
You can proceed to the [Notebook: Clean Up](05_Clean_Up.ipynb) if you are done for now, or proceed to one of the following additional Notebooks:
* [Notebook: Automatic Model Selection](06_Advanced_Auto_Model_Selection.ipynb)
* [Notebook: Tensorboard Visualization](07_Advanced_Tensorboard.ipynb) - note this requires running this notebook on your own machine
* [Notebook: Keras with TensorFlow](09_Keras_Single_GPU_Training_With_Tensorflow.ipynb)