# Image Classification Labs

## Lab 3-3 Distributed Training (MXNet)

### Parallelism

**data parallelism**
to partition the workload over multiple devices. Assume there are n devices. Then each one will receive a copy of the complete model and train it on 1/n of the data. The results such as gradients and updated model are communicated across these devices.

**model parallelism**
In this approach, each device holds onto only part of the model. 

> In this lab, data parallelism method is introduced. Refer to https://github.com/dmlc/mxnet/tree/master/example/model-parallel-lstm to see how Model Parallelism could be implemented.

<img src='./parallelism-google.png' width='800'>

### MXNet Distributed Training Architecture

* Worker node: 
* Server node: 
* Shared storage:

> In case of no shared storage available :

![](https://raw.githubusercontent.com/dmlc/dmlc.github.io/master/img/ps-arch.png "ps")

### MXNet Distributed Training Cluster Types

* ssh
* mpi
* local
* sge
* yarn

## Key-Value Store in MXNET

### What is KV Store

### How to set KV Store (http://mxnet.io/api/python/kvstore.html)

mxnet.kvstore.create() creates a new KVStore.

For **single** machine training, there are two commonly used types:

* local: Copies all gradients to CPU memory and updates weights there.

* device: Aggregates gradients and updates weights on GPUs. With this setting, the KVStore also attempts to use GPU peer-to-peer communication, potentially accelerating the communication.

>Note: <font color='red'>Use "device" if GPUs >= 4</font>

For **distributed** training, KVStore also supports a number of types:

* **dist_sync**: Behaves similarly to local but with one major difference. With dist_sync, batch-size now means the batch size used on each machine. So if there are n machines and we use batch size b, then dist_sync behaves like local with batch size n * b.

* **dist_device_sync**: Identical to dist_sync with the difference similar to device vs local.

* **dist_async**: Performs asynchronous updates. The weights are updated whenever gradients are received from any machine. No two updates happen on the same weight at the same time. However, the order is not guaranteed.


** Souce code: ./common/fit.py **
```
    # kvstore
    kv = mx.kvstore.create(args.kv_store)
```

```
    # create model
    model = mx.mod.Module(
        context       = devs,
        symbol        = network
    )
    
    # run
    model.fit(train,
        begin_epoch        = args.load_epoch if args.load_epoch else 0,
        num_epoch          = args.num_epochs,
        eval_data          = val,
        eval_metric        = eval_metrics,
        kvstore            = kv,
        optimizer          = args.optimizer,
        optimizer_params   = optimizer_params,
        initializer        = initializer,
        arg_params         = arg_params,
        aux_params         = aux_params,
        batch_end_callback = batch_end_callbacks,
        epoch_end_callback = checkpoint,
        allow_missing      = True,
        monitor            = monitor)
```

### MXNet Distributed Training Launcher Tool

MXNet provides a launcher tool for distributed training. launch.py is located in ./tools directory.


```
$ ./launch.py -h
usage: launch.py [-h] -n NUM_WORKERS [-s NUM_SERVERS] [-H HOSTFILE]
                 [--sync-dst-dir SYNC_DST_DIR]
                 [--launcher {local,ssh,mpi,sge,yarn}]
                 command [command ...]

Launch a distributed job

positional arguments:
  command               command for launching the program

optional arguments:
  -h, --help            show this help message and exit
  -n NUM_WORKERS, --num-workers NUM_WORKERS
                        number of worker nodes to be launched
  -s NUM_SERVERS, --num-servers NUM_SERVERS
                        number of server nodes to be launched, in default it
                        is equal to NUM_WORKERS
  -H HOSTFILE, --hostfile HOSTFILE
                        the hostfile of slave machines which will run the job.
                        Required for ssh and mpi launcher
  --sync-dst-dir SYNC_DST_DIR
                        if specificed, it will sync the current directory into
                        slave machines's SYNC_DST_DIR if ssh launcher is used
  --launcher {local,ssh,mpi,sge,yarn}
                        the launcher to use
```

Trackers for each launch is located in examples/mxnet/dmlc-core/tracker/dmlc_tracker directory.


## Distributed Deep Learning on AWS

https://github.com/awslabs/deeplearning-cfn

* Master
* Worker (/opt/deeplearning/workers)

<img src="https://github.com/awslabs/deeplearning-cfn/raw/master/images/Slide0.png" width="600">

## Lab Steps - MXNet

Step 1. Add SSH private key into the authentication agent

```
$ ssh-add -K <private key file name>
```

Step 2. SSH to the master node

Step 3. Clone git repository

Step 4. Execute distributed training



### Clone the awslabs/deeplearning-cfn repo that contains the examples onto the EFS mount

```
git clone https://github.com/awslabs/deeplearning-cfn $EFS_MOUNT/deeplearning-cfn && \
cd $EFS_MOUNT/deeplearning-cfn && \
#
#fetches dmlc/mxnet and tensorflow/models repos as submodules
git submodule update --init $EFS_MOUNT/deeplearning-cfn/examples/tensorflow/models && \
git submodule update --init $EFS_MOUNT/deeplearning-cfn/examples/mxnet && \
cd $EFS_MOUNT/deeplearning-cfn/examples/mxnet/ && \
git submodule update --init $EFS_MOUNT/deeplearning-cfn/examples/mxnet/dmlc-core
```

### Running Distributed Training on MXNet


1. terminate all running Python processes across workers 
```
while read -u 10 host; do ssh -o "StrictHostKeyChecking no" $host "pkill -f python" ; \
done 10<$DEEPLEARNING_WORKERS_PATH
```

2. navigate to the MXNet image-classification example directory
```
cd $EFS_MOUNT/deeplearning-cfn/examples/mxnet/example/image-classification/
```

3. [GPU] run the CIFAR10 distributed training example
```
../../tools/launch.py -n $DEEPLEARNING_WORKERS_COUNT -H $DEEPLEARNING_WORKERS_PATH \
python train_cifar10.py --gpus $(seq -s , 0 1 $(($DEEPLEARNING_WORKER_GPU_COUNT - 1))) \
--network resnet --num-layers 50 --kv-store dist_device_sync --num-epoch 2
```

4. [CPU] run the CIFAR10 distributed training example
```
../../tools/launch.py -n $DEEPLEARNING_WORKERS_COUNT -H $DEEPLEARNING_WORKERS_PATH \
python train_cifar10.py  \
--network resnet --num-layers 8 --kv-store dist_device_sync --num-epoch 2
```

### Process started on worker node

```
ubuntu    1768  1744  1 08:49 ?        00:00:01 python train_cifar10.py --gpus --network resnet --num-layers 50 --kv-store dist_device_sync
```

### Output of Distributed MXNet execution

```
ubuntu@ip-10-0-0-219:/myEFSvolume/deeplearning-cfn/examples/mxnet/example/image-classification$ ../../tools/launch.py -n $DEEPLEARNING_WORKERS_COUNT -H $DEEPLEARNING_WORKERS_PATH python train_cifar10.py  --network resnet --num-layers 8 --kv-store dist_device_sync --num-epoch 1 --model-prefix /myEFSvolume/deeplearning-cfn/examples/mxnet/example/image-classification/model-output
NVIDIA: no NVIDIA devices found
NVIDIA: no NVIDIA devices found
NVIDIA: no NVIDIA devices found
NVIDIA: no NVIDIA devices found
NVIDIA: no NVIDIA devices found
INFO:root:start with arguments Namespace(batch_size=128, benchmark=0, data_nthreads=4, data_train='data/cifar10_train.rec', data_val='data/cifar10_val.rec', disp_batches=20, dtype='float32', gpus=None, image_shape='3,28,28', kv_store='dist_device_sync', load_epoch=None, lr=0.05, lr_factor=0.1, lr_step_epochs='200,250', max_random_aspect_ratio=0, max_random_h=36, max_random_l=50, max_random_rotate_angle=0, max_random_s=50, max_random_scale=1, max_random_shear_ratio=0, min_random_scale=1, model_prefix='/myEFSvolume/deeplearning-cfn/examples/mxnet/example/image-classification/model-output', mom=0.9, monitor=0, network='resnet', num_classes=10, num_epochs=1, num_examples=50000, num_layers=8, optimizer='sgd', pad_size=4, random_crop=1, random_mirror=1, rgb_mean='123.68,116.779,103.939', test_io=0, top_k=0, wd=0.0001)
INFO:root:start with arguments Namespace(batch_size=128, benchmark=0, data_nthreads=4, data_train='data/cifar10_train.rec', data_val='data/cifar10_val.rec', disp_batches=20, dtype='float32', gpus=None, image_shape='3,28,28', kv_store='dist_device_sync', load_epoch=None, lr=0.05, lr_factor=0.1, lr_step_epochs='200,250', max_random_aspect_ratio=0, max_random_h=36, max_random_l=50, max_random_rotate_angle=0, max_random_s=50, max_random_scale=1, max_random_shear_ratio=0, min_random_scale=1, model_prefix='/myEFSvolume/deeplearning-cfn/examples/mxnet/example/image-classification/model-output', mom=0.9, monitor=0, network='resnet', num_classes=10, num_epochs=1, num_examples=50000, num_layers=8, optimizer='sgd', pad_size=4, random_crop=1, random_mirror=1, rgb_mean='123.68,116.779,103.939', test_io=0, top_k=0, wd=0.0001)
[16:27:44] src/io/iter_image_recordio_2.cc:135: ImageRecordIOParser2: data/cifar10_train.rec, use 1 threads for decoding..
[16:27:44] src/io/iter_image_recordio_2.cc:135: ImageRecordIOParser2: data/cifar10_train.rec, use 1 threads for decoding..
[16:27:44] src/io/iter_image_recordio_2.cc:135: ImageRecordIOParser2: data/cifar10_val.rec, use 1 threads for decoding..
[16:27:44] src/io/iter_image_recordio_2.cc:135: ImageRecordIOParser2: data/cifar10_val.rec, use 1 threads for decoding..
INFO:root:Epoch[0] Batch [20]	Speed: 223.74 samples/sec	accuracy=0.168155
INFO:root:Epoch[0] Batch [20]	Speed: 224.03 samples/sec	accuracy=0.165551
INFO:root:Epoch[0] Batch [40]	Speed: 219.29 samples/sec	accuracy=0.258984
INFO:root:Epoch[0] Batch [40]	Speed: 218.81 samples/sec	accuracy=0.233203
INFO:root:Epoch[0] Batch [60]	Speed: 219.94 samples/sec	accuracy=0.276953
INFO:root:Epoch[0] Batch [60]	Speed: 220.39 samples/sec	accuracy=0.272656
INFO:root:Epoch[0] Batch [80]	Speed: 214.74 samples/sec	accuracy=0.298047
INFO:root:Epoch[0] Batch [80]	Speed: 214.70 samples/sec	accuracy=0.300391
INFO:root:Epoch[0] Batch [100]	Speed: 223.44 samples/sec	accuracy=0.319141
INFO:root:Epoch[0] Batch [100]	Speed: 223.35 samples/sec	accuracy=0.319531
INFO:root:Epoch[0] Batch [120]	Speed: 213.90 samples/sec	accuracy=0.331641
INFO:root:Epoch[0] Batch [120]	Speed: 213.67 samples/sec	accuracy=0.320703
INFO:root:Epoch[0] Batch [140]	Speed: 228.06 samples/sec	accuracy=0.348438
INFO:root:Epoch[0] Batch [140]	Speed: 225.88 samples/sec	accuracy=0.338281
INFO:root:Epoch[0] Batch [160]	Speed: 217.08 samples/sec	accuracy=0.368359
INFO:root:Epoch[0] Batch [160]	Speed: 215.39 samples/sec	accuracy=0.361719
INFO:root:Epoch[0] Batch [180]	Speed: 204.91 samples/sec	accuracy=0.364844
INFO:root:Epoch[0] Batch [180]	Speed: 203.97 samples/sec	accuracy=0.387500
INFO:root:Epoch[0] Train-accuracy=0.382812
INFO:root:Epoch[0] Time cost=114.941
INFO:root:Epoch[0] Train-accuracy=0.380208
INFO:root:Epoch[0] Time cost=114.961
INFO:root:Saved checkpoint to "/myEFSvolume/deeplearning-cfn/examples/mxnet/example/image-classification/model-output-0001.params"
INFO:root:Saved checkpoint to "/myEFSvolume/deeplearning-cfn/examples/mxnet/example/image-classification/model-output-1-0001.params"
INFO:root:Epoch[0] Validation-accuracy=0.425977
INFO:root:Epoch[0] Validation-accuracy=0.427734
MKL Build:20170209
MKL Build:20170209
```

> ### Trainig output using a single machine
>
With the same parameter (num-layers, num-epoch), the training on a single machine 
```
INFO:root:Epoch[0] Batch [20]	Speed: 363.66 samples/sec	accuracy=0.164807
INFO:root:Epoch[0] Batch [40]	Speed: 353.78 samples/sec	accuracy=0.228516
INFO:root:Epoch[0] Batch [60]	Speed: 363.93 samples/sec	accuracy=0.264453
INFO:root:Epoch[0] Batch [80]	Speed: 354.36 samples/sec	accuracy=0.278906
INFO:root:Epoch[0] Batch [100]	Speed: 364.05 samples/sec	accuracy=0.299219
INFO:root:Epoch[0] Batch [120]	Speed: 352.75 samples/sec	accuracy=0.308594
INFO:root:Epoch[0] Batch [140]	Speed: 364.74 samples/sec	accuracy=0.343750
INFO:root:Epoch[0] Batch [160]	Speed: 356.02 samples/sec	accuracy=0.339844
INFO:root:Epoch[0] Batch [180]	Speed: 365.27 samples/sec	accuracy=0.349219
INFO:root:Epoch[0] Batch [200]	Speed: 356.39 samples/sec	accuracy=0.348047
INFO:root:Epoch[0] Batch [220]	Speed: 364.78 samples/sec	accuracy=0.364844
INFO:root:Epoch[0] Batch [240]	Speed: 354.15 samples/sec	accuracy=0.371094
INFO:root:Epoch[0] Batch [260]	Speed: 364.89 samples/sec	accuracy=0.364063
INFO:root:Epoch[0] Batch [280]	Speed: 356.20 samples/sec	accuracy=0.388672
INFO:root:Epoch[0] Batch [300]	Speed: 364.59 samples/sec	accuracy=0.404687
INFO:root:Epoch[0] Batch [320]	Speed: 353.17 samples/sec	accuracy=0.414453
INFO:root:Epoch[0] Batch [340]	Speed: 362.48 samples/sec	accuracy=0.423828
INFO:root:Epoch[0] Batch [360]	Speed: 360.31 samples/sec	accuracy=0.415234
INFO:root:Epoch[0] Batch [380]	Speed: 353.27 samples/sec	accuracy=0.435937
INFO:root:Epoch[0] Train-accuracy=0.407813
INFO:root:Epoch[0] Time cost=138.994
INFO:root:Epoch[0] Validation-accuracy=0.471519
```

### Training model output

* model-output-symbol.json
* model-output-1-symbol.json
* model-output-0001.params
* model-output-1-0001.params

> <font color='red'>Why does models for each epoch of node give different validation accuracy ?</font>

### Compare the training speed and accuracy of distributed training vs. single node training

Number of workers | Time | Speed (samples/sec) of each batch (approx) | Validation Accuracy
:---:| :---: | :---: | :--:
1 (Single CPU machine, c4.large) | 138 | 350 | 0.47
2 (2 node CPU cluster, c4.large)| 114 | 210 | 0.42
2 (4 node CPU cluster, c4.large) | 73 | 160 | 0.52
2 (2 node GPU cluster, p2.xlarge) | 29 | 880 | 0.53
4 (4 node CPU cluster, c4.large) | 36 | 340 | 0.44

### References

Run MXNet on Multiple CPU/GPUs with Data Parallelism
http://mxnet.io/how_to/multi_devices.html

Distributed Training
http://newdocs.readthedocs.io/en/latest/distributed_training.html

Distributed TensorFlow 
https://www.tensorflow.org/deploy/distributed

### Using multiple GPUs in a Single Machine

##### Key-Value Store Types
* **local** : all gradients are copied to CPU memory and weights are updated there
* **device** : both gradient aggregation and weight updates are run on GPUs. With this setting, the KVStore also attempts to use GPU peer-to-peer communication, potentially accelerating the communication. Note that this option may result in higher GPU memory usage. 

>Note: <font color='red'>Use "device" if GPUs >= 4</font>