# Stochastic Training of GNN with Multiple GPUs
Note: this tutorial requires a GPU enabled machine with multiple gpu devices

This tutorial shows how to train a multi-layer GraphSAGE on a single machine with multiple GPUs for node classification on ogbn-arxiv provided by Open Graph Benchmark (OGB). The dataset contains around 170 thousand nodes and 2 million edges.

At the end of this tutorial you will be able to

 * Parallelize model training across multiple GPUs on a single device.
 * Distribute the model parameters using PyTorch DDP.
 
## Distributed training overview
Training models on very large datasets can take hours or even days to converge. In deep learning, we can get substantial speed-ups by distributing the training workload across multiple workers. Typically, workers run in parallel and can communicate their updates. Workers can be individual machines in a cluster (not covered in this tutorial). In this tutorial workers are processes in a single machine with multiple GPUs.

### Data Parallelism

For Multi-GPU training on a single machine, Data parallelism is an easy-to-implement and effective training approach.

Here is how it works:

 * The data is divided into k partitions where k is the number of gpu workers.
 * The model is copied to each of the gpu workers.
 * Each worker operates on its own subset of the data.
 * Each worker communicates of its model changes to the other workers to update their corresponding model.
 
PyTorch DistributedDataParallel (DDP) is the recommended built-in solution for multi-GPU training.

You can use PyTorch DDP for DGL models in the same way for any other PyTorch applications.

 * Pytorch DDP implements data parallelism at the module level, therefore it wraps the model implementation.
 * To use it, your code needs to fork or spawn multiple processes, each with it's own DDP instance.
 * DDP uses collective communications to synchronize gradients and buffers.
 * For machines with Nvidia GPUs it's common using nccl as the communications backend

## Loading Dataset
OGB already prepared the data as DGL graph.

In [None]:
import torch
import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset

dataset = DglNodePropPredDataset('ogbn-arxiv')

OGB dataset is a collection of graphs and their labels. Ogbn-arxiv dataset only contains a single graph. So you can simply get the graph and its node labels like this:

In [None]:
import dgl
graph, node_labels = dataset[0]
# Add reverse edges since ogbn-arxiv is unidirectional.
graph = dgl.add_reverse_edges(graph)
graph.ndata['label'] = node_labels[:, 0]
print(graph)
print(node_labels)

node_features = graph.ndata['feat']
num_features = node_features.shape[1]
num_classes = (node_labels.max() + 1).item()
print('Number of classes:', num_classes)

You can get the training-validation-test split of the nodes with ``get_split_idx`` method.

In [None]:
idx_split = dataset.get_idx_split()
train_nids = idx_split['train']
valid_nids = idx_split['valid']
test_nids = idx_split['test']

## Defining Neighbor Sampler and Data Loader
We follow the previous **Training GNN with Neighbor Sampling for Node Classification** tutorial to define our data loader using ``dgl.dataloading.NodeDataLoader`` for iterating over the dataset and ``dgl.dataloading.MultiLayerNeighborSampler`` for randomly picking a fixed number of neighbors for each node.

In [None]:
def create_dataloader(rank, world_size, graph, nids):
    partition_size = len(nids) // world_size  # split dataset into n GPUs
    partition_offset = partition_size * rank
    nids = nids[partition_offset:partition_offset+partition_size]
    
    sampler = dgl.dataloading.MultiLayerNeighborSampler([4, 4])
    dataloader = dgl.dataloading.NodeDataLoader(
        graph, nids, sampler,
        batch_size=1024,
        shuffle=True,
        drop_last=False,
        num_workers=0
    )
    
    return dataloader

## Defining Model
The model implementation will be exactly the same as what you have seen in the previous tutorial.

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from dgl.nn import SAGEConv

class SageModel(nn.Module):
    def __init__(self, in_feats, h_feats, num_classes):
        super(SageModel, self).__init__()
        self.conv1 = SAGEConv(in_feats, h_feats, aggregator_type='mean')
        self.conv2 = SAGEConv(h_feats, num_classes, aggregator_type='mean')
        self.h_feats = h_feats

    def forward(self, mfgs, x):
        h_dst = x[:mfgs[0].num_dst_nodes()]
        h = self.conv1(mfgs[0], (x, h_dst))
        h = F.relu(h)
        h_dst = h[:mfgs[1].num_dst_nodes()]
        h = self.conv2(mfgs[1], (h, h_dst))
        return h

## Distributing the Model to GPUs
PyTorch DDP manages the distribution of models and synchronization of the gradients for you.

For DGL you can simply wrap the model with torch.nn.parallel.DistributedDataParallel.

Here we make a simple function to do that:

In [None]:
from torch.nn.parallel import DistributedDataParallel

def init_model(rank, in_feats, n_hidden, n_classes):
    model = SageModel(in_feats, n_hidden, n_classes).to(rank)
    return DistributedDataParallel(model, device_ids=[rank], output_device=rank)

The recommended way to distribute training is to have one training process per GPU

During model instantiation we also specify the process rank, which is equal to the GPU ID.

## The Training Loop for one Process
The training loop for a single process running with a single GPU

In [None]:
from utils import fix_openmp as thread_wrapped_func
import sklearn

@thread_wrapped_func
def train(rank, world_size, data):
    # data is the output of load_data
    torch.distributed.init_process_group(
        backend='nccl',
        init_method='tcp://127.0.0.1:12345',
        world_size=world_size,
        rank=rank)
    torch.cuda.set_device(rank)
    
    graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes = data
    
    train_dataloader = create_dataloader(rank, world_size, graph, train_nids)
    # We only use one worker for validation
    valid_dataloader = create_dataloader(0, 1, graph, valid_nids)
    
    model = init_model(rank, num_features, 128, num_classes)
    opt = torch.optim.Adam(model.parameters())
    torch.distributed.barrier()
    
    best_accuracy = 0
    best_model_path = 'model.pt'
    for epoch in range(10):
        model.train()
        for step, (input_nodes, output_nodes, mfgs) in enumerate(train_dataloader):
            mfgs = [mfg.to(rank) for mfg in mfgs]
            inputs = node_features[input_nodes].cuda()
            labels = mfgs[-1].dstdata['label']
            predictions = model(mfgs, inputs)

            loss = F.cross_entropy(predictions, labels)
            opt.zero_grad()
            loss.backward()
            opt.step()

            accuracy = sklearn.metrics.accuracy_score(labels.cpu().numpy(),
                                                      predictions.argmax(1).detach().cpu().numpy())

            if rank == 0 and step % 10 == 0:
                print('Epoch {:05d} Step {:05d} Train acc {:.04f} Loss {:.04f}'.format(
                    epoch, step, accuracy, loss.item()))

        torch.distributed.barrier()
        
        # GPU 0 will do the evaluation
        if rank == 0:
            model.eval()
            predictions = []
            labels = []
            with torch.no_grad():
                for input_nodes, output_nodes, bipartites in valid_dataloader:
                    bipartites = [b.to(rank) for b in bipartites]
                    inputs = node_features[input_nodes].cuda()
                    labels.append(node_labels[output_nodes].numpy())
                    predictions.append(model.module(bipartites, inputs).argmax(1).cpu().numpy())
                predictions = np.concatenate(predictions)
                labels = np.concatenate(labels)
                accuracy = sklearn.metrics.accuracy_score(labels, predictions)
                print('Epoch {} Validation Accuracy {}'.format(epoch, accuracy))
                if best_accuracy < accuracy:
                    best_accuracy = accuracy
                    torch.save(model.module.state_dict(), best_model_path)
                    
        torch.distributed.barrier()

## Spawning multiple processes for the Multi GPU training

In [None]:
import torch.multiprocessing as mp
if __name__ == '__main__':
    procs = []
    data = (graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes)
    for proc_id in range(4):    # 4 gpus
        p = mp.Process(target=train, args=(proc_id, 4, data))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()

## Conclusion
In this tutorial, you have learned how to train a multi-layer GraphSAGE for node classification on a large dataset that cannot fit into GPU. The method you have learned can scale to a graph of any size, and works on a single machine with any number of GPU.