# Scaling up neural net training in PyTorch using distributed data parallel training

## Neural network training basic setup
- Before covering the distributed data parallel algorithm it's important to understand the basics of neural network training in PyTorch, its bottlenecks, and the natural progression from training on a cpu to training across multiple machines as data and models grow. 

**Before getting started**: This notebook can be run only on CPUs for the most part, and you only need a working installation of python and pytorch, no other dependencies! At some point, support for some operating systems will stop (when the distributed portion starts), later, support for CPU-only training will stop. I have tried my best to make the code as readable as possible, so even if you can't run it, hopefully it still illustrates the concepts.
To run all the code here you will need access to a machine with multiple GPUs. Luckily, UH has an excellent [HPC cluster](https://datascience.hawaii.edu/hpc/). Getting an account is free and easy for students and it's a great resource!


### A gentle introduction on the CPU
- Generally, neural networks can be trained on any hardware, as such, the CPU is the natural starting point that is accessible to anyone.
- We will start with a small network and toy data to introduce the problem and training procedure.

In [39]:
# imports
import time
import torch
from torch import nn, optim
from statistics import mean
from typing import List, Tuple

torch.manual_seed(691)

<torch._C.Generator at 0x10bf930b0>

In [2]:
# Model class
class FFNet(nn.Module):
    """Simple neural net class.

    Goal is not to be comprehensive but rather to provide a simple interface to a toy network that allows to quickly scale the size of the network. 
    """

    def __init__(self,
                 input_size: int = 100,
                 num_hl: int = 0,
                 hl_size: int = 10):
        """
        Args:
            input_size (int): dimensionality of the input. Default is 100.
            num_hl (int): Number of hidden layers (layers between input and output layer) in the network. Default is 0.
            hl_size (int): Number of units per hidden layer. Default is 10.
        """
        super(FFNet, self).__init__()
        # initial fully connected layer that ingests input
        self._fc_in = nn.Linear(input_size, hl_size)
        self._relu = nn.ReLU()

        # intermediate hidden layers, easily allow us to grow the network
        self._hidden_layers = None
        if num_hl > 0:
            hls = []
            for _ in range(num_hl):
                hls += [nn.Linear(hl_size, hl_size), nn.ReLU()]
            self._hidden_layers = nn.Sequential(*hls)

        # output layer, no activation function for simplicity
        self._fc_out = nn.Linear(hl_size, 1)

    def forward(self, x: torch.tensor) -> torch.Tensor:
        """Forward pass of data through network.

        Args:
            x (torch.Tensor): Input data. Shape must be Bxinput_size, where B is the batch size and input_size is the data dimensionality.

        Returns:
            torch.Tensor: Inputs after being processed by the network. Shape will be Bx1 where 1 is the width of the output layer.
        """
        z = self._fc_in(x)
        z = self._relu(z)
        if self._hidden_layers:
            z = self._hidden_layers(z)
        y = self._fc_out(z)
        return y

In [3]:
# Data class
class ToyDataset:
    """Simple dataset class.

    Creates artificial data, goal is to easily stream data without dataloading being a bottleneck. 
    Should be passed to torch.utils.data.DataLoader.
    """

    def __init__(self,
                 input_size: int = 100,
                 num_samples: int = 1_024):
        """
        Args:
            input_size (int): Dimensionality of the data
            num_samples (int): Simulated size of the dataset
        """
        self._input_size = input_size
        self._num_samples = num_samples

    def __len__(self) -> int:
        """
        Returns:
            int: Number of samples in the dataset
        """
        return self._num_samples

    def __getitem__(self,
                    index: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Args:
            index (int): Ignored since data is randomly generated on the fly.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]: sample, label data pair
        """
        return torch.randn(self._input_size), torch.randn(1)


#### Set up training on the CPU
- No special hardware required
- Not parallelized at all
- Creating the model and the dataset:
    - In a real-world application, you'd want a training and validation dataset as we've discussed in class before. 
    - However, since we're only interested in performance here and the data is random anyway, a training dataset will suffice.

In [18]:
# Data parameters
batch_size = 8
input_size = 100
num_samples = 1_024

train_ds = ToyDataset(input_size=input_size, num_samples=num_samples)
# Using the torch DataLoader is good practice as it allows easy shuffling and scaling to parallel data loading by increasing the number of workers
train_loader = torch.utils.data.DataLoader(train_ds, batch_size=batch_size, num_workers=0, shuffle=False)

In [17]:
# Model parameters
num_hl = 0
hl_size = 10

nn_mdl = FFNet(input_size=input_size, num_hl=num_hl, hl_size=hl_size)

* Training the model
    * To train the model we need to create a loss function to indicate to the model how "well" it's doing. We will use the mean squared error, but it doesn't really matter here. 
    * We also need to define a optimizer to update the model parameters over the course of training. We will use standard stochastic gradient descent with momentum. More on that in a bit. 

In [6]:
loss_fn = nn.MSELoss()
optimizer = optim.SGD(nn_mdl.parameters(), lr=0.001, momentum=0.9)

In [7]:
# how many times we want to loop over the training data
nb_epochs = 10

for epoch in range(nb_epochs): 
    running_loss = 0.
    for samples, labels in train_loader:
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = nn_mdl(samples)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()

    # we expect loss to remain pretty consistent since data and labels are random
    print(f'Epoch: {epoch + 1} - total loss: {running_loss / 2000:.3f}')

print('Finished Training')

Epoch: 1 - total loss: 0.061
Epoch: 2 - total loss: 0.064
Epoch: 3 - total loss: 0.064
Epoch: 4 - total loss: 0.063
Epoch: 5 - total loss: 0.063
Epoch: 6 - total loss: 0.065
Epoch: 7 - total loss: 0.069
Epoch: 8 - total loss: 0.062
Epoch: 9 - total loss: 0.060
Epoch: 10 - total loss: 0.063
Finished Training


### Benchmarking
* Since we're interested in performance, let's create a function to do the whole training process and only return the time needed to train a given model with a given dataset.

In [8]:
def benchmark_training(nn_mdl: nn.Module,
                       batch_size: int = 32,
                       input_size: int = 100,
                       num_samples: int = 16_384,
                       nb_epochs: int = 10,
                       repeats: int = 10,
                       verbose: bool = True) -> List[float]:
    """Function to benchmark training time for a provided model on toy data.

    Args:
        nn_mdl (nn.Module): model to be trained 
        batch_size (int, optional): Defaults to 32.
        input_size (int, optional): Size of training data samples. Defaults to 100.
        num_samples (int, optional): Number of training data samples. Defaults to 16_384.
        nb_epochs (int, optional): How many epochs to train for. Defaults to 10.
        repeats (int, optional): How many times to repeat the whole training process. Defaults to 10.
        verbose (bool, optional): Whether to print progress between each repeat. Defaults to True.

    Returns:
        List[flaot]: List of times it took for training to complete per repeat. In nanoseconds
    """
    # initialize data
    train_ds = ToyDataset(input_size=input_size, num_samples=num_samples)
    train_loader = torch.utils.data.DataLoader(
        train_ds, batch_size=batch_size, num_workers=0, shuffle=False)

    # initialize optimizer& loss
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(nn_mdl.parameters(), lr=0.001, momentum=0.9)

    # train loop
    train_times = []
    for rep_i in range(repeats):
        if verbose:
            print(f'Repeat number: {rep_i + 1} of {repeats}')
        start = time.perf_counter_ns()
        for epoch in range(nb_epochs):
            for batch_idx, data in enumerate(train_loader):
                # get the inputs; data is a list of [inputs, labels]
                samples, labels = data

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward + backward + optimize
                outputs = nn_mdl(samples)
                loss = loss_fn(outputs, labels)
                loss.backward()
                optimizer.step()
        train_times.append(time.perf_counter_ns()-start)

    return train_times

In [9]:
base_result = benchmark_training(nn_mdl, verbose=False)
print(f'Average time to train: {(mean(base_result) * 1e-9):.3f} seconds')

Average time to train: 1.124 seconds


### Scaling up - more data, bigger models (I)
* Training on a CPU will almost always work. But it will quickly stop being fast. CPUs, because they are built for universailty, sacrifice speed for very specific applications. 
* See below how training times quickly get out of hand as the models or data (or both) grow
    * Notice especially how they don't grow additively but rather (roughly) multiplicatively. If increasing the model or data alone results in a 10x of training time, increasing both roughly results in a 100x training time, not 20x. 
* Rememer from class: large language models (LLMs) are huge, starting in the 10s-of-millions parameter range and scaling up to **trillions** of parameters. And they are trained on massive corpora. Running that kind of computation is impossible. 
    * See below for an illustration of how LLMs have grown

<img src="./images/llm_param_nb.jpeg" width="50%"/>
<!--cite [2] https://huggingface.co/blog/large-language-models-->

In [30]:
# grow the model, keep data the same
print(f'Total number of parameters in original, small model: {sum(p.numel() for p in nn_mdl.parameters())}')
tmp_mdl = FFNet(input_size=input_size, num_hl=3, hl_size=100)
print(f'Total number of parameters in medium-sized model: {sum(p.numel() for p in tmp_mdl.parameters())}')
result = benchmark_training(tmp_mdl, verbose=False, repeats=1)
print(f'Time to train medium-sized model: {(mean(result) * 1e-9):.3f} seconds')

Total number of parameters in original, small model: 1021
Total number of parameters in medium-sized model: 40501
Time to train medium-sized model: 15.118 seconds


In [31]:
# increase the amount of data to roughly 131k (2^17) samples
result = benchmark_training(nn_mdl, num_samples=131_072, verbose=False, repeats=1)
print(f'Time to train small model on a larger amount of data: {(mean(result) * 1e-9):.3f} seconds')

Time to train small model on a larger amount of data: 9.450 seconds


In [33]:
# scaling both model size and amount of data:
tmp_mdl = FFNet(input_size=input_size, num_hl=3, hl_size=100)
result = benchmark_training(tmp_mdl, num_samples=131_072, verbose=False, repeats=1)
print(f'Time to train a larger model on a larger amount of data: {(mean(result) * 1e-9):.3f} seconds')

Time to train a larger model on a larger amount of data: 100.629 seconds


### Scaling up - more data, bigger models (cont.)
* Training a neural network is a computationally expensive, but highly repetitive and parallelizabl task. 
* Instead of using a CPU which is not optimized for this sort of computation, we can use hardware that is more specifically built to excell at the most repetitive componenets of training a neural network:
#### GPUs
* One easy way to scale up is through dedicated hardware that is able to train the neural networks faster/ more efficiently.
* GPUs, intially built for computer graphics applications (hence the name graphical processing unit) are very adapt at quickly computing large matrix operations, which are the basis for neural network training. Furthermore, GPUs have significantly more memory throughput than CPUs, allowing for more data to be processed. 
* In essence, when training on a GPU, the entire model is held in memory, data can be passed through the model quickly and results are stored in the GPU too to efficiently calcuate gradients and make parameter updates. 
* Training on one (or multiple) GPUs is efficient, but it requires enough memory to fit the whole model in the GPU and store the results of forward passes of the data as well. 
* So, when training large models with high-dimensional data and large batch sizes, even a single GPU might not be enough. 
* Usually, we run into one of two problems, that deserve closer examinatino:
    1) **We cannot make our batches of data large enough**
    2) **We can't even fit the entire model into GPU memory**

(Sidenote: there exists even more dedicated hardware specifically for the linear algebra operations needed to train and run neural networks. Modern high-end GPUs often include so-called tensor cores for these applications and there are ever entire accelerators consisting of exclusively these cores called tensor processing units - TPUs, but for all intents and purposes, scaling to this hardware is very similar to scaling to GPU training from our perspective, even though there are many differences on a low level.)

1) We cannot make our batches of data large enough.
    * This is a problem because of how stochastic gradient descent works. 
    * Generally, using gradient descent to fit a model to some data, we calculate the gradients of the loss with respect to parameters in the model. We then use the "direction" of the gradients to change our parameters in a way that will reduce the loss if the exact same data is passed through the model again. 
    * *Stochastic* gradient descent incorporates the notion of mini batches, which are essentially subsets of the data. This has practical reasons (i.e. the entire data can usually not be processed at once), but also modeling reasons. Mini batches provide a noisy estimate of the gradients we would get when passing all of the data through the model. This would be suboptimal if our training data were excatly the same as future data the model will encounter, but that's not usually the case. So, getting noisy gradient estimates and parameter updates can act as regularization, preventing us from overfitting to the training data and enabling better generalization in the future. See below for an illustration
    * However, there is a tradeoff. Larger batchsizes mean our gradient estimate is closer to the "true" gradients of the training data but are harder to process and may lead to overfitting, while smaller batches increase the noise. If the batch is too small (e.g. a single sample), it is easy to see how the gradient estimates now become almost meaningless and training is near impossible. 
    * Furthermore, some applications require very large batch sizes, e.g. contrastive learning frameworks such as [SimCLR](https://arxiv.org/abs/2002.05709).
    * One way to deal with this is gradient accumulation, where mini batches are essentially further broken up into micro batches, and multiple of those are passed through the model before and update to the parameters is made. However, this is still very slow as micro batches cannot be processed in parallel. 
2) We can't even fit the entire model into GPU memory.
    * This is an obvious problem because it defeats the purpose of using a GPU in the first place - doing the matrix computations in as close to one big step as possible. 
    * This can be solved through model-parallelism, splitting the model onto multiple GPUs, each only holding a subset of all model parameters and performing a subset of the whole computation. 

At the end of the tutorial, there will be more info on how to train on GPUs, however, since this requires having access to a GPU, it will be tabled for now. The following examples generally make more sense in the context of scaling to multiple GPUs, but to keep it accessible, they will be kept on CPU to illustrate the basic principles before requiring specific hardware. 

<img src="./images/sgd_illustration.png" width="60%"/>

#### Data parallelism
* One way to solve problem 1) while still allowing for maximum parallel computation is (distributed) data parallelism.
* Data parallelism is also orthogonal to model parallelism, so for large models, both can be applied simultaneously given enough hardware is available. 
* Let's make some assumptions to motivate data parallelism. 
    1) Assume, we're running on a machine with access to multiple GPUs or even have access to multiple machines with one or more GPUs.
    2) Assume we can fit the model on a single GPU (or have enough hardware to apply model parallelism).
    3) Assume we can only process very small batches of data on a single GPU (e.g. 8 examples).
    
**We want to efficiently use all of the available GPUs, possibly across multiple machines, to train the model as efficiently as possible.**

(Sidenote: PyTorch has a data parallel (DP) and a distributed data parallel (DDP) algorithm. While DP is simpler, its performance suffers from language-inherent - GIL - and implementation-specific limitations compared to DDP. Additionally, DDP scales to settings with multiple machines where DP does not. These differences come down to DP chosing an easy-to-implement multithreading approach where DDP uses multiprocessing and IPC. Thus, principles from DDP easily generalize to DP but not vice versa and as such, DDP is more interesting and will be examined here.)

<img src="./images/data_parallel_basic.png" width="60%"/>

In [None]:
# At this point, torch.distributed must be supported for your OS, OS X for example does not currently support this.
# That's why using a docker container is helpful
try: 
    assert torch.distributed.is_available()
except AssertionError:
    raise NotImplementedError('OS does not support torch distributed training.')

## Distributed Data Parallel
* The goal is to have a copy of our model on multiple devices, where each device can process an independent chunk of data and models get synchronizes at some point between learning steps.
* Two approaches to achieving this may come to mind:
    1) **Parameter averaging**: each model goes through a full training step locally, including updating its parameters, and then the synchronization consists of averaging all model parameters. This suffers from multiple drawbacks
        * The synchronization steps and the training steps are completely separate and cannot be parallelized. All computation happens locally per copy of the model and then a separate communication step has to occur to synchroniza all models. Ideally, we would like the overlap these steps for increased efficiency. 
        * Mathematical equivalence to single-machine training is not guaranteed. Ideally, we would like the model resulting from parallelized training to be indetical to the model if it had hypothetically been trained on a single machine without any parallelism. This is not the case with parameter averaging for several reasons, most obviously because of local optimizer states may diverge between machines. Briefly, some optimizers maintain states that depend on gradients from previous epochs. If gradients are calculated locally per-model, they will differ because different copies of the model encounter different data. For instance, momentum, which helps with learning, may quickly diverge between optimizers depending on local gradients, resulting in an ultimately different model from what would have been with a single machine and a single optimizer. 
    2) **Gradient averaging**: After each backward pass, when all parameter gradients are calculated but BEFORE local model copies are updated, we can communicate all local gradients between the models and average them such that the updates done to all local models are the same after the update. 
        * This will take care of the issue with mathematical equivalency because optimizer states no longer diverge and we will see later how it also allows for some parallelized communication and computation. 
### A naive approach
* To start, lets try to implement a first-pass, naive version of DDP which will illustrate the basic principles without getting too hung up on optimization details. 
* This section will use some concepts from high performance computing topics, mainly inter-process communication (IPC). Most of this is abstracted into frameworks (MPI for example) so a detailed knowledge of these concepts is not required. 
* The code here will be shown in the notebook, but will have to be run from scripts because it relies on spawning groups of separate python processes which is tricky (not possible?) from inside IPython. 


#### Setup - initialize the processes:
* First, to set up for DDP training, we need to launch multiple processes that can communicate with each other and all have a copy of the same model
* Adding them to the same process group in the pytorch multiprocessing module allows for easy communication through establishes IPC software
* Different IPC libraries have different advantages
    * We use gloo here because its binaries are included with the pytorch installation

```python
def init_process(rank: int,
                 world_size: int,
                 train_fn: Callable,
                 train_fn_kwargs: Optional[dict] = None,
                 backend: Optional[str] = 'gloo'):
    """Function called to initialize individual processes

    Args:
        rank (int): Rank of this process
        world_size (int): Number of processes in the group
        train_fn (callable): Function that actually runs model training
        backend (optional, str): Backend to use. Options are gloo, mpi, nccl. Default is gloo.
    """
    try:
        val_backends = ['gloo', 'mpi', 'nccl']
        assert backend in ['gloo', 'mpi', 'nccl']
    except AssertionError:
        raise NotImplementedError(
            f'Supported backends: {val_backends} don\'t include: {backend}')
    # There are other ways than environment variables of initializing such as TCP or a shared file system for IPC
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '8899'
    dist.init_process_group(
        backend,
        rank=rank,
        world_size=world_size)
    if train_fn_kwargs:
        train_fn(rank, world_size, **train_fn_kwargs)
    else:
        train_fn(rank, world_size)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', "--world_size", type=int, default=2,
                        help="number of processes to spawn")
    parser.add_argument('-i', "--input_size", type=int, default=100,
                        help="dimensionality of the data")
    parser.add_argument('-n', "--num_samples", type=int, default=1_024,
                        help="size of the dataset")
    parser.add_argument('-d', "--num_hl", type=int, default=0,
                        help="number of hidden layers in the network")
    parser.add_argument('-z', "--hl_size", type=int, default=100,
                        help="size of the hidden layers in the network")
    parser.add_argument('-e', "--epochs", type=int, default=10,
                        help="number of epochs to train for")

    args = parser.parse_args()

    train_fn_kwargs = {
        'input_size': args.input_size,
        'num_samples': args.num_samples,
        'num_hl': args.num_hl,
        'hl_size': args.hl_size,
        'epochs': args.epochs
    }

    processes = []
    for rank in range(args.world_size):
        p = mp.Process(target=init_process, args=(
            rank, args.world_size, run_training, train_fn_kwargs, 'gloo'))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
```


#### Setup - partitioning the data across processes:
* Now we need a way to partition the dataset and distribute it between all processes in our process group.
* The partitioner takes the total dataset and then selects a random, non-overlapping subset for each process.

```python
class DatasetPartition(object):
    """Essentially the same as ToyDataset class.
    Implements basic functionality for torch dataset on partitions.
    """

    def __init__(self,
                 data: Iterable,
                 index: int):
        """
        Args:
            data (Iterable): some local data subset from global dataset
            index (int): some index for local data
        """
        self.data = data
        self.index = index

    def __len__(self) -> int:
        """Returns total size of this dataset

        Returns:
            int: total length
        """
        return len(self.index)

    def __getitem__(self,
                    idx: int) -> Iterable:
        """_summary_

        Args:
            idx (int): index for item to getch

        Returns:
            Iterable: data at index
        """
        local_index = self.index[idx]
        return self.data[local_index]


class DataPartitioner(object):
    """Helper class to go from global dataset to local partitions for each process.
    """

    def __init__(self,
                 data: Iterable,
                 world_size: int,
                 seed: int = 7):
        """        
        Args:
            data (Iterable): Global dataset to be distributed among processes
            world_size (int): Number of processes among which to distribute the data
            seed (int, optional): Seed for random generator to ensure reproducability. Defaults to 7.
        """
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)

        # list of all data indices to shuffle for random distribution
        indices = list(range(0, len(data)))
        rng.shuffle(indices)

        # might drop modulo if not perfectly divisible for simplicity
        part_len = len(data) // world_size
        for _ in range(world_size):
            self.partitions.append(indices[:part_len])
            indices = indices[part_len:]

    def use(self, partition_idx: int) -> DatasetPartition:
        """Return partition at index partition_idx

        Args:
            partitio_idx (int): partition index

        Returns:
            DatasetPartition: Partition for process partition_index
        """
        return DatasetPartition(self.data, self.partitions[partition_idx])


def partition_dataset(world_size: int,
                      batch_size: int,
                      input_size: int = 100,
                      num_samples: int = 1_024) -> torch.utils.data.DataLoader:
    """Function actually splitting the dataset

    Args:
        world_size (int): Number of processes in the group
        batch_size (int): Batch size per process
        input_size (int, optional): Data dimensionality. Defaults to 100.
        num_samples (int, optional): Total dataset size. Defaults to 1_024.

    Returns:
        torch.utils.data.DataLoader: Dataloader for this process' subset of the total data
    """
    batch_size

    train_ds = ToyDataset(input_size=input_size, num_samples=num_samples)
    partition = DataPartitioner(train_ds, world_size)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                            batch_size=batch_size,
                                            shuffle=True)
    return train_set
```

#### Setup - gradient all_reduce:
* Now we need to work on the IPC for gradient averaging
* Luckily, since we already have a process group established we can make use of a standard all_reduce call in the pytorch distributed module to make this process as easy as possible

```python

def reduce_gradients(model: torch.nn.Module):
    """Function to all_reduce gradients from all processes

    First gets and sums all gradients for each parameter and then averages them.
    Stores the updated gradient information directly in the model so no need to return anything.

    Args:
        model (torch.nn.Module): Model being trained
    """
    # number of total processes for averaging
    size = float(dist.get_world_size())

    # repeat this process for each individual parameter in the model
    for param in model.parameters():
        # get this parameter from each process' model copy and them sum them all
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        # divide by number of total processes to get the final gradient
        param.grad.data /= size

```

#### Setup - the main training loop:
* This version of the training loop is very similar to the non-parallelized one with some essential differences:
    * Each process only trains on its subset of the total dataset
    * After the ```loss.backward()``` call - where local gradients are calculated -, we don't immediately call ```optimizer.step()``` - where parameters are updated - but instead call our function to average all gradients across the process group first. 
* Also note that every process still initializes a separate model but we use the same RNG seed in all processes so the models will be identical. Another way this could be accomplished is by having one process (e.g. rank 0) broadcast its parameters to all other processes. This would add a one-time communication overhead but overall be negligible extra effort. 

```python

def run_training(rank: int,
                 world_size: int,
                 input_size: int = 100,
                 num_samples: int = 1_024,
                 num_hl: int = 0,
                 hl_size: int = 10,
                 epochs: int = 10):
    """Main training code

    Args:
        rank (int): Process rank
        world_size (int): Total number processes in the group
        input_size (int, optional): Data dimensionality, must be 1D. Defaults to 100.
        num_samples (int, optional): Total number of samples in the dataset. Defaults to 1_024.
        num_hl (int, optional): Number of hidden layers in the NN. Defaults to 0.
        hl_size (int, optional): Size of the hidden layers in the NN. Defaults to 10.
        epochs(int, optional): Number of epochs to train for.
    """
    # By manually seeding we ensure all models start from the same initial state
    # Another way of accomplishing this is by having one process (e.g. rank 0) broadcast its model at the start
    torch.manual_seed(0)

    batch_size = 8
    train_set = partition_dataset(
        world_size, batch_size, input_size, num_samples)

    nn_mdl = FFNet(input_size=input_size, num_hl=num_hl, hl_size=hl_size)
    optimizer = optim.SGD(nn_mdl.parameters(), lr=0.001, momentum=0.9)
    loss_fn = nn.MSELoss()

    for epoch in range(epochs):
        running_loss = 0.
        for samples, labels in train_set:
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = nn_mdl(samples)
            loss = loss_fn(outputs, labels)
            loss.backward()

            # this time we first have to average the gradients across the process group
            reduce_gradients(nn_mdl)
            optimizer.step()

            # print statistics
            running_loss += loss.item()
        print(
            f'Process {rank} / {world_size} -> Epoch: {epoch + 1} - total loss: {running_loss / 2000:.3f}')

```

#### To run this script, simply run:
* ```python ./ddp_tutorial/naive_ddp.py``` or ```time python ./ddp_tutorial/naive_ddp.py``` to time execution speed.

* You can play around with the model and data sizes using flags like ```time python ./ddp_tutorial/native_ddp.py -d 3``` to increase the number of hidden layers.

#### Several problems with this implementation:
* You may notice that, when running this on a CPU along for larger networks, it actually performs worse than just running in sequence. On one hand, this has to do with the fact that the libraries are already optimized for training on multi-core CPUs, this would be different if we had multiple GPUs available and could distribute between those. But there are also implementation-specific reasons that would hinder performance regardless which are worth looking at. 
* Previously, when discussing parameter averaging, we noted that it removes the opportunity to overlay computation and communication
    1) This implementation also does not take advantage of this opportunity, first calculating all gradients in ```loss.backward()``` then reducing them, and then updating the model
    2) Also, reducing the parameters 1-by-1 is, as it turns out, not optimal because all_reduce is more efficient for large tensors. See below for a detailed analysis on the gloo library.
* At this point, we will forgo implementing solutions for these problems ourselves and just cover them theoretically. While not easy in practice, it should be clear where to insert the optimizations discussed below into the naive framework established above. 
* The actual pytorch DDP package employs these and some more subtle optimizations and provides an easy interface to use them so we don't have to worry about doing it ourselves!

<img src="./images/allreduce_gloo.png" width="40%"/>
<!--cite [1] https://arxiv.org/abs/2006.15704-->

#### Bucketing
* By not waiting for the entire backward pass to complete, we can start communicating gradients as their calculations complete.
* We don't want to communicate every gradient individually however, because we know doing all_reduce on larger tensors is more efficient
* The solution is **bucketing**, grouping parameters into buckets
    * The same parameters contain the same parameters in all processes
    * Once a all gradients for a bucket are calculated in all processes, all_reduce is initiated

<img src="./images/torch_bucketing.png" width="70%"/>
<!--cite [4] https://user-images.githubusercontent.com/16999635/72401724-d296d880-371a-11ea-90ab-737f86543df9.png-->

#### Bucketing (cont.)
* As mentioned above, buckets must be synchronized, bucket 1 must be reduced with bucket 1, not bucket 0. This introduces the challenge of figuring out a bucketing order that is as efficient as possible, otherwise, if P1-bucket1 finishes first, and P0-bucket1 finishes last, we have gained nothing.
* Fortunately, the way gradients are calculated in a deep neural network interacts extremely well with this requirement. 
* Generally, we can think of the gradients as being calcuated in the reverse order of how data flows through the network, with the highest parameters in the network first getting their gradients. In practice, this is much more complicated, especially for networks with separate computational block and residual connections, but this will suffice for an intuition.
* The bucket order can thus simply be established in the order in which parameters appear in the network. Higher parameters are grouped with each other, and similarly for lower parameters. Thus, we somewhat assure that buckets complete computation in similar order across processes and can be reduces quickly.
* An example is shown below, where arrows point in the direction of the ```.backward()``` call, the order in which gradients are calculated. 
* While bucket size only has a moderate performance input, it is an easy-to-adjust performance lever and provides an interesting trade-off:
    * Larger buckets contain larger gradient tensors making all_reduce more efficient.
    * Larger buckets, however, also take longer to become ready, so communication can't be initiated as quickly and opportunities for asynchronous processing are lost. 
    * Ultimately, if performance is critical, this should be tuned empirically, as it depends on hardware and what IPC libraries are used. 

<img src="./images/ddp_full.png" width="70%"/>
<!--cite [1] https://arxiv.org/abs/2006.15704-->

### A working, full DDP example:
* Now that the most important concepts have been covered, let's look at a working example of the full PyTorch DDP implementation in action:
    * At this point, we will train on GPUs, not needlessly split the workload among a single CPU. 
    * Each process will be associated with its own GPU.

```python

def run_training(gpu: int,
                 args: dict):
    """Main training code

    Args:
        gpu (int): index of the gpu for this process on this process' node
        args (dict): dictionary of args
    """
    # By manually seeding we ensure all models start from the same initial state
    # Another way of accomplishing this is by having one process (e.g. rank 0) broadcast its model at the start
    torch.manual_seed(0)

    batch_size = 8
    rank = args.nr * args.gpus + gpu
    dist.init_process_group(
        backend='gloo',
        world_size=args.world_size,
        rank=rank
    )

    # Initialize the model on the GPU
    torch.cuda.set_device(gpu)
    nn_mdl = FFNet(input_size=args.input_size,
                   num_hl=args.num_hl, hl_size=args.hl_size)
    nn_mdl.cuda(gpu)

    # Optimizer and loss (loss has to be put on GPU too)
    optimizer = optim.SGD(nn_mdl.parameters(), lr=0.001, momentum=0.9)
    loss_fn = nn.MSELoss().cuda(gpu)

    # Distributing the model is as easy as that
    model = nn.parallel.DistributedDataParallel(model,
                                                device_ids=[gpu])

    # Initialize the dataset
    train_ds = ToyDataset(input_size=args.input_size,
                          num_samples=args.num_samples)

    # This will communicate to the process which subset of the dataset to draw from
    # Essentially the partition helper from the naive example
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_ds,
        num_replicas=args.world_size,
        rank=rank
    )

    # torch dataloader interfaces nicely with the distributed sampler
    train_set = torch.utils.data.DataLoader(
        dataset=train_ds,
        batch_size=batch_size,
        shuffle=True,
        sampler=train_sampler)

    for epoch in range(args.epochs):
        running_loss = 0.
        for samples, labels in train_set:
            # data has to be loaded to the gpu
            samples.cuda(non_blocking=True)
            labels.cuda(non_blocking=True)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = nn_mdl(samples)
            loss = loss_fn(outputs, labels)
            loss.backward()
            # at this point we previously had to average the gradients
            # torch native DDP uses hooks on the autograd graph to do this in the backend
            optimizer.step()

            # print statistics
            running_loss += loss.item()
        if args.verbose:
            print(
                f'Process {rank} / {args.world_size} -> Epoch: {epoch + 1} - total loss: {running_loss / 2000:.3f}')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1,
                        type=int, help='number of nodes')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-s', "--world_size", type=int, default=2,
                        help="number of processes to spawn")
    parser.add_argument('-i', "--input_size", type=int, default=100,
                        help="dimensionality of the data")
    parser.add_argument('-nb', "--num_samples", type=int, default=1_024,
                        help="size of the dataset")
    parser.add_argument('-d', "--num_hl", type=int, default=0,
                        help="number of hidden layers in the network")
    parser.add_argument('-z', "--hl_size", type=int, default=100,
                        help="size of the hidden layers in the network")
    parser.add_argument('-e', "--epochs", type=int, default=10,
                        help="number of epochs to train for")
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('-v', '--verbose', type=int, default=1,
                        help='Print training process. 1=verbose, 0=silent.')

    args = parser.parse_args()

    # one process per GPU, nodes could be scaled too but not on the UH cluster since multi-node-gpu jobs are prohibited
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '8899'
    mp.spawn(run_training, nprocs=args.gpus, args=(args,))
```

#### A working, full DDP example (cont.):
* This script is obviously much shorts, but note how it still works very similar to the naive approach.
* ```nn.parallal.DistributedDataParallel``` takes care of coordinating the model copies, while the ```DistributedSampler``` provides an elegant way of partitioning the dataset. 
* The only difference is the lack of an explicit all_reduce call in the code. This is because ```DistributedDataParallel``` moves this to the backend, including grouping parameters into buckets and coordinating all_reduce calls accordingly. 

#### To run this script:
* If you are on the UH cluster, you can use the batch script in ```./ddp_tutorial/full_ddp.sh```, simply adapt the paths and you should be good to go
* If you running on a local machine with multiple GPUs, you can invoke it as before using ```python ./ddp_tutorial/full_ddp.py``` or ```time python ./ddp_tutorial/full_ddp.py``` to time execution speed.
* Again, feel free to change the flags and see how it affects performance!

## Additional Resources:
* There is lots more to explore in terms of DDP and scaling deep learning in general, much more than can be mentioned here.
* Some natural extension to what was covered here however:
    * [PyTorch distributed RPC framework](https://pytorch.org/docs/stable/rpc.html): this is a more flexible framework for distributed training that extends well beyond data parallelism alone.
    * [PyTorch automatic mixed precision](https://pytorch.org/tutorials/recipes/recipes/amp_recipe.html): this is an excellent way of further scaling up training. If you have access to a device with tensor cores, this allows for storing some tensors as fp16, saving memory, speeding up computation, and allowing you to process more data at once!
    * [PyTorch Lightning](https://www.pytorchlightning.ai/) is an excellent high-level wrapper around PyTorch (think keras for tensorflow). It's build for research purposes, and, as it happens, also supports [distributed training](https://pytorch-lightning.readthedocs.io/en/stable/accelerators/gpu_intermediate.html). Scaling lightning apps to distributed training is even easier than it is in native PyTorch!

## Sources:
* These are the sources I used throughout this post, a lot of the code is adapted from here and you can find much additional information on there!

Primary paper:
1) https://arxiv.org/abs/2006.15704

Secondary websites/ blog posts:
1) https://pytorch.org/docs/stable/notes/ddp.html
2) https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
3) https://pytorch.org/tutorials/intermediate/dist_tuto.html#collective-communication
4) http://seba1511.net/dist_blog/
5) https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html