# Stochastic Training of GNN with Multiple GPUs

*Note: this tutorial requires a GPU enabled machine with multiple gpu devices*

This tutorials will show you to 

* train a GNN model on a single machine with multiple GPUs on a graph of any size with `torch.nn.parallel.DistributedDataParallel`.

At the end of this tutorial you will be able to 
* Parallelize model training across multiple GPUs on a single device
* Distribute the model parameters using PyTorch DDP.

## Distributed training overview

Training models on very large datasets can take hours or even days to converge

In deep learning, we can get substantial speed-ups by distributing the training workload across multiple workers.

Typically, workers run in parallel and can communicate their updates (directly or via a central hub)

Workers can be individual machines in a cluster (**not covered in this tutorial**)

In a single machine with multiple gpus, workers can be each gpu device

### Data Parallelism

For Multi-GPU training on a single machine, Data parallelism is an easy-to-implement and effective distributed training approach.



Here is how it works:

* The data is divided into `k` partitions where `k` is the number of gpu workers
* The model is copied to each of the gpu workers
* Each worker operates on its own subset of the data
* Each worker communicates of its changes to the other workers to update their corresponding models

PyTorch `DistributedDataParallel` (DDP) is the recommended built-in solution for multi-GPU training.

You can use PyTorch DDP for DGL models in the same way you would for any other PyTorch applications

* Pytorch DDP implements data parallelism at the module level, therefore it wraps the model implementation

* To use it, your code needs to spawn multiple processes each with it's own DDP instance

* DDP uses collective communications to synchronize gradients and buffers

* For machines with Nvidia GPUs it's common use `nccl` as the communications backend

See https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html for more


In [1]:
import numpy as np
import dgl
import torch
import dgl.nn as dglnn
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel
import torch.nn.functional as F
import torch.multiprocessing as mp
import sklearn.metrics
import tqdm

import utils
from utils import thread_wrapped_func

Using backend: pytorch


## Load Dataset

The following code is loading the dataset from the first tutorial.

In [2]:
def load_data():
    import pickle

    with open('data.pkl', 'rb') as f:
        data = pickle.load(f)
    graph, node_features, node_labels, train_nids, valid_nids, test_nids = data
    utils.prepare_mp(graph)
    
    num_features = node_features.shape[1]
    num_classes = (node_labels.max() + 1).item()
    
    return graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes

## Customize Neighborhood Sampling

Previously we have seen how to use `NodeDataLoader` together with `MultiLayerNeighborSampler`.  In fact, you can replace `MultiLayerNeighborSampler` with your own sampling strategy.

The customization is simple.  For each GNN layer, you only need to specify the edges involved in the message passing as a graph.  Such a graph will have the same nodes as the original graph.  For example, here is how `MultiLayerNeighborSampler` is implemented:

In [3]:
class MultiLayerNeighborSampler(dgl.dataloading.BlockSampler):
    def __init__(self, fanouts):
        super().__init__(len(fanouts), return_eids=False)
        self.fanouts = fanouts
        
    def sample_frontier(self, layer_id, g, seed_nodes):
        fanout = self.fanouts[layer_id]
        return dgl.sampling.sample_neighbors(g, seed_nodes, fanout)

## Defining Data Loader for Distributed Data Parallel (DDP)

In PyTorch DDP each worker process is assigned an integer *rank*.  

The rank indicates which partition of the dataset the worker process will handle.

In [4]:
def create_dataloader(rank, world_size, graph, nids):
    partition_size = len(nids) // world_size
    partition_offset = partition_size * rank
    nids = nids[partition_offset:partition_offset+partition_size]
    
    sampler = MultiLayerNeighborSampler([4, 4, 4])
    dataloader = dgl.dataloading.NodeDataLoader(
        graph, nids, sampler,
        batch_size=1024,
        shuffle=True,
        drop_last=False,
        num_workers=0
    )
    
    return dataloader

## Defining Model

The model implementation will be exactly the same as what you have seen in the first tutorial.

In [5]:
class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
        
    def forward(self, bipartites, x):
        for l, (layer, bipartite) in enumerate(zip(self.layers, bipartites)):
            x = layer(bipartite, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

## Distributing the Model to GPUs

PyTorch DDP manages the distribution of models and synchronization of the gradients for you.  

For a DGL you can simply wrap the model with `torch.nn.parallel.DistributedDataParallel`.

Here we make a simple function that does that:

In [6]:
def init_model(rank, in_feats, n_hidden, n_classes, n_layers):
    model = SAGE(in_feats, n_hidden, n_classes, n_layers).to(rank)
    return DistributedDataParallel(model, device_ids=[rank], output_device=rank)

The recommended way to distribute training is to have one training process per GPU

During model instantiation we also specify the process rank, which is equal to the GPU ID.

## The Training Loop for one Process

The training loop for a single process running with a single GPU

In [7]:
@thread_wrapped_func
def train(rank, world_size, data):
    # data is the output of load_data
    torch.distributed.init_process_group(
        backend='nccl',
        init_method='tcp://127.0.0.1:12345',
        world_size=world_size,
        rank=rank)
    torch.cuda.set_device(rank)
    
    graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes = data
    
    train_dataloader = create_dataloader(rank, world_size, graph, train_nids)
    # We only use one worker for validation
    valid_dataloader = create_dataloader(0, 1, graph, valid_nids)
    
    model = init_model(rank, num_features, 128, num_classes, 3)
    opt = torch.optim.Adam(model.parameters())
    torch.distributed.barrier()
    
    best_accuracy = 0
    best_model_path = 'model.pt'
    for epoch in range(10):
        model.train()

        for step, (input_nodes, output_nodes, bipartites) in enumerate(train_dataloader):
            bipartites = [b.to(rank) for b in bipartites]
            inputs = node_features[input_nodes].cuda()
            labels = node_labels[output_nodes].cuda()
            predictions = model(bipartites, inputs)

            loss = F.cross_entropy(predictions, labels)
            opt.zero_grad()
            loss.backward()
            opt.step()

            accuracy = sklearn.metrics.accuracy_score(labels.cpu().numpy(), predictions.argmax(1).detach().cpu().numpy())

            if rank == 0 and step % 10 == 0:
                print('Epoch {:05d} Step {:05d} Loss {:.04f}'.format(epoch, step, loss.item()))

        torch.distributed.barrier()
        
        if rank == 0:
            model.eval()
            predictions = []
            labels = []
            with torch.no_grad():
                for input_nodes, output_nodes, bipartites in valid_dataloader:
                    bipartites = [b.to(rank) for b in bipartites]
                    inputs = node_features[input_nodes].cuda()
                    labels.append(node_labels[output_nodes].numpy())
                    predictions.append(model.module(bipartites, inputs).argmax(1).cpu().numpy())
                predictions = np.concatenate(predictions)
                labels = np.concatenate(labels)
                accuracy = sklearn.metrics.accuracy_score(labels, predictions)
                print('Epoch {} Validation Accuracy {}'.format(epoch, accuracy))
                if best_accuracy < accuracy:
                    best_accuracy = accuracy
                    torch.save(model.module.state_dict(), best_model_path)
                    
        torch.distributed.barrier()

## Spawning multiple processes for the Multi GPU training

In [8]:
if __name__ == '__main__':
    procs = []
    data = load_data()
    for proc_id in range(4):    # 4 gpus
        p = mp.Process(target=train, args=(proc_id, 4, data))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()

Epoch 00000 Step 00000 Loss 6.7778
Epoch 00000 Step 00010 Loss 3.0505
Epoch 00000 Step 00020 Loss 2.2261
Epoch 00000 Step 00030 Loss 1.8884
Epoch 00000 Step 00040 Loss 1.6211
Epoch 0 Validation Accuracy 0.7000991785977672
Epoch 00001 Step 00000 Loss 1.5559
Epoch 00001 Step 00010 Loss 1.3617
Epoch 00001 Step 00020 Loss 1.3690
Epoch 00001 Step 00030 Loss 1.3224
Epoch 00001 Step 00040 Loss 1.1171
Epoch 1 Validation Accuracy 0.7940645423797777
Epoch 00002 Step 00000 Loss 1.2186
Epoch 00002 Step 00010 Loss 1.1311
Epoch 00002 Step 00020 Loss 1.0952
Epoch 00002 Step 00030 Loss 1.0875
Epoch 00002 Step 00040 Loss 0.9908
Epoch 2 Validation Accuracy 0.8124507285812375
Epoch 00003 Step 00000 Loss 1.0880
Epoch 00003 Step 00010 Loss 1.0708
Epoch 00003 Step 00020 Loss 1.1645
Epoch 00003 Step 00030 Loss 0.9818
Epoch 00003 Step 00040 Loss 0.9260
Epoch 3 Validation Accuracy 0.825267655061923
Epoch 00004 Step 00000 Loss 1.0013
Epoch 00004 Step 00010 Loss 0.8391
Epoch 00004 Step 00020 Loss 0.9800
Epoch 00

## Conclusion

In this tutorial, you have learned how to train a multi-layer GraphSAGE for node classification on a large dataset that cannot fit into GPU.  The method you have learned can scale to a graph of any size, and works on a single machine with *any number of* GPU.

## What's next?

The next tutorial will be about adapting mini-batch training procedure for heterogeneous graphs.

## Additional material: caveat in training with DDP

When writing DDP code, you may often find these two kinds of errors:

* `Cannot re-initialize CUDA in forked subprocess`

  This is because you have initialized the CUDA context before creating subprocesses using `mp.Process`.  Solutions include:
  
  * Remove all the code that can possibly initialize CUDA context before calling `mp.Process`.  For instance, you cannot get number of GPUs via `torch.cuda.device_count()` before calling `mp.Process` since that also initializes CUDA context.  You can check whether CUDA context is initialized via `torch.cuda.is_initialized()`.
  
  * Use `torch.multiprocessing.spawn()` to create processes instead of forking with `mp.Process`.  A downside is that Python will duplicate the graph storage for every process spawned this way.  Memory consumption will linearly scale up.
  
* Training process freezes during minibatch iteration.

  This is due to a [lasting bug in the interaction between GNU OpenMP and `fork`](https://github.com/pytorch/pytorch/issues/17199).  A workaround is to wrap the target function of `mp.Process` with the decorator `utils.thread_wrapped_func`, provided in the tutorial.