# Overview of MXFusion's Distributed Training

```
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#   Licensed under the Apache License, Version 2.0 (the "License").
#   You may not use this file except in compliance with the License.
#   A copy of the License is located at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   or in the "license" file accompanying this file. This file is distributed
#   on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#   express or implied. See the License for the specific language governing
#   permissions and limitations under the License.
# ==============================================================================
```

MXFusion uses [Horovod](https://github.com/horovod/horovod) as its distributed training framework. Horovod is an open source distributed training framework hosted by the LF AI Foundation. It provides support for various deep learning libraries, which are Tensorflow, Keras, PyTorch and Apache MXNet. MXFusion, which is built on top of MXNet, benefits from Horovod's high-level API that makes distributed deep learning easily implemented.

Horovod uses a data parallelism approach to distributed training. The training procedure consists of 3 stages:

1. Run multiple copies (depends on number of processors) of the training script which takes a chunk of the data and computes the gradient by running it through the designed model.
2. Averages the gradients among multiple copies.
3. Update the model. Repeat Step 1 until all iterations are completed.

Horovod's core principles are based on the MPI concepts. One concept is <tt>Allreduce</tt>, which aggregates data among multiple processes and sends back to each processes. Its [DistributedTrainer](https://github.com/horovod/horovod/blob/master/horovod/mxnet/__init__.py) API is a subclass of MXNet gluon.Trainer, which averages data after <tt>Allreduce</tt> gradients among processors. We are able to utilize this high-level API in training our probabilistic models in MXFusion, as long as the loss function equation below is valid:

\begin{equation*}
L = \left( \sum_{k=1}^n L_k \right)
\end{equation*}

where $L$ is the final loss function and $L_k$ is the loss function of processor k.

## Implementation on MXFusion 

We implemented very simple interface to call MXFusion in distributed training in high level. We introduced <tt>DistributedGradBasedInference</tt>, which is the distributed class of <tt>GradBasedInference</tt>. The only additional method of this class is <tt>rescale</tt>, which rescales the latent variables of SVI based on number of processors running.

In SVI, the loss functions takes in the mean and variance of prior and posterior of each latent variables into account. Summing the loss functions of each processors would cause multiplication those mean and variances. Hence, one method to return the correct final loss function is rescaling the variables based on number of processors.

In [None]:
def rescale(self, rv_scaling):
    """
    Return the rescaled scaling factor of random variables for SVI.
    """
    import horovod.mxnet as hvd
    for _, variable in enumerate(
            self.inference_algorithm.model.get_latent_variables(self.inference_algorithm.observed)):
        if variable not in rv_scaling:
            rv_scaling[variable.uuid] = 1 / hvd.size()

    return rv_scaling

Similar to <tt>GradBasedInference</tt>, <tt>DistributedGradBasedInference</tt> runs the training in a batch gradient-based optimization recursively. While <tt>GradBasedInference</tt> uses <tt>BatchInferenceLoop</tt> and <tt>MinibatchInferenceLoop</tt>, <tt>DistributedGradBasedInference</tt> uses <tt>DistributedBatchInferenceLoop</tt> and <tt>DistributedMinibatchInferenceLoop</tt>. Both of the batch loops inherit <tt>DistributedGradLoop</tt>.

As Horovod runs on multiple scripts, which require each scripts to take a different partition of data. <tt>DistributedGradLoop</tt> has the method <tt>split_data</tt> to allow each processors. Since Horovod is based on MPI concepts, we can make use of <tt>size</tt> (total number of processors) and <tt>rank</tt> (unique ID of each processors) to determine which part and how big the partition should be taken.

In [None]:
def split_data(self, data):
    if hvd.size() > 1:
        temporaryData = []
        
        for _, subdata in enumerate(data):
            x = int(subdata.shape[0] / hvd.size())
            y = subdata.shape[0] % hvd.size()
            rank = hvd.rank()
            z = 0 if (rank < y) else 1
            f = 0 if (rank < y + 1) else 1
            start_point = rank*x+rank-f*(rank-y)
            end_point = (rank+1)*x+rank-z*(rank-y+1) + 1
            tempData = mx.nd.slice_axis(subdata, axis=0, begin=start_point, end=end_point)
            temporaryData.append(tempData)

        data = temporaryData

    return data

In <tt>DistributedBatchInferenceLoop</tt> and <tt>DistributedMinibatchInferenceLoop</tt>, the trainer is created using Horovod's <tt>DistributedTrainer</tt> while passing in the same parameters, optimizer and optimizer's parameters. We also split the data with <tt>split_data</tt> method.

In [None]:
trainer = hvd.DistributedTrainer(param_dict, optimizer=optimizer,optimizer_params={'learning_rate': learning_rate})
data = self.split_data(data=data)

When each processors has executed one iteration, the total iteration executed is $n$ times, where $n$ is the number of processors. With the same learning rate, the processors should move to the negative gradient with step size of $n$ times.

In [None]:
loss_for_gradient = loss_for_gradient * hvd.size()

The next tutorial will demonstrate on how to use the implemented MXFusion's APIs to run distributed training of probabilistic models.