In [5]:
import torch
from torch.nn import DataParallel, Linear
import torchvision
from torch.utils.data.dataset import Dataset
from torch.utils.data.dataloader import DataLoader
import numpy as np


### Load MNIST Data

In [33]:
mnist_train, mnist_test, train_ds, test_ds = None,None,None,None
def loaddata(rootdir = './MNIST_data'):
    global mnist_train, mnist_test, train_ds, test_ds
    print('loading data')
    mnist_train = torchvision.datasets.MNIST(root = rootdir,train=True,download=True)
    mnist_test = torchvision.datasets.MNIST(root = rootdir,train=False,download=True)
    mnist_train,mnist_test

    #A dataset returning pil image and label for each __getitem__ call
    print(mnist_train)
    train_ds = MyDataset(mnist_train)
    test_ds = MyDataset(mnist_test)
    print(len(train_ds), len(test_ds))
    print(train_ds[0][0].shape,train_ds[0][1])

### Convert Images to array

In [25]:
class MyDataset(Dataset):
    def __init__(self, mnist_dataset):
        super(MyDataset,self).__init__()
        self.mnist_dataset = mnist_dataset
    def __getitem__(self, index):
        item = self.mnist_dataset[index]
        image = item[0]
        label = item[1]
        image_array = np.array(image,dtype = np.float32).reshape((-1))/255
        return image_array, label
    def __len__(self):
        return len(self.mnist_dataset)

## Model

In [26]:
class LinearModel(torch.nn.Module):
    def __init__(self, input_size, output_size, verbose = False):
        super(LinearModel,self).__init__()
        self.fc1 = torch.nn.Linear(input_size,128)
        self.fc2 = torch.nn.Linear(128,64)
        self.relu = torch.nn.ReLU()
        self.fc3 = torch.nn.Linear(64,output_size)
        self.verbose = verbose
    def forward(self, X):
        
        if self.verbose:
            print('X shape = {}, current cuda device = {}' \
                  .format(X.shape,torch.cuda.current_device() if torch.cuda.is_available() else 'NAN'))
        h1 = self.relu(self.fc1(X))
        return self.fc3(self.relu(self.fc2(h1)))

## Train

In [27]:
from tqdm import tqdm
def train(model, optimizer, dataloader, device):
    model.train()
    print('using device',device)
    losses = []
    for epoch in range(args.epochs):
        for x,y in tqdm(dataloader, position=0):
            optimizer.zero_grad()
            x = x.to(device)
            y = y.to(device)
            
            yhat = model(x)
            
            loss = torch.nn.CrossEntropyLoss()(yhat, y)

            loss.backward()

            optimizer.step()
            losses.append(loss.item())
            
        print ('loss at epoch {} is {}'.format(epoch, losses[-1]))
        for param in model.parameters():
            print('shape = {}, mean of values = {}'.format(param.shape,param.mean()) )

### Evaluate

In [28]:
def evaluate(model, dataloader, device):
    model.eval()
    losses,preds,labels = [],np.empty((0,)),np.empty((0,))
    for x,y in tqdm(dataloader, position=0):
        x = x.to(device)
        y = y.to(device)
        yhat = model(x)
        loss = torch.nn.CrossEntropyLoss()(yhat, y)
        losses.append(loss.item())
        
        preds = np.hstack(  ( preds, np.argmax(yhat.detach().to('cpu').numpy(), axis = 1) .squeeze()) )
        labels = np.hstack( (labels, y.to('cpu').numpy()) )
        
        #The losses are averaged across observations for each minibatch.
    final_loss = sum(losses)/len(losses)
    acc = accuracy(preds,labels)
    print ('validation loss is {}'.format(final_loss))
    print ('validation accuracy is {}'.format(acc))
    
    return final_loss, acc
    

def accuracy (preds, labels):
    print(preds[:10], labels[:10])
    return np.mean(preds == labels)

#### Single GPU

In [29]:
def train_singlegpu():
    gpu = torch.device('cuda')
    model = LinearModel(784,10)
    model = model.to(gpu)
    dataloader = DataLoader(train_ds,batch_size=args.batch)
    optimizer = torch.optim.Adam(params = model.parameters(), lr=args.lr)
    train(model, optimizer, dataloader, gpu)
    
    eval_dataloader = dataloader = DataLoader(test_ds, batch_size=args.batch)
    evaluate(model,eval_dataloader, gpu)
    
    
# import argparse
# args = argparse.Namespace(
#     batch = 16,
#     lr = 0.001,
#     epochs =1
# )
#loaddata()
#train_singlegpu()

## Distributed data parallel
Each GPU can be run in it's own process.

1. Models are not transferred. 
1. Each process has it's own model initialized. 
1. Only gradients are summed up using all_reduce operation
1. The seed should be same everywhere for consistency
    
This should be faster than Dataparallel. The only comppinication cost is all reduce.

Tutorial: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Data sampler source : https://pytorch.org/docs/stable/_modules/torch/utils/data/distributed.html#DistributedSampler


Backend https://pytorch.org/docs/stable/distributed.html

In [30]:
#%%writefile dist_training.py
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
import os,torch,sys
import datetime

def setup(local_rank, global_rank, world_size, master_node_addr = 'localhost', master_node_port = '12355',backend = 'gloo'):
    
    print('setting up')
    os.environ['MASTER_ADDR'] =master_node_addr
    os.environ['MASTER_PORT'] = master_node_port
    # initialize the process group
    dist.init_process_group(backend, rank=global_rank, world_size=world_size, timeout=datetime.timedelta(0,seconds =  300))
    #dist.init_process_group(backend)
    # Explicitly setting seed to make sure that models created in two processes
    # start from same random weights and biases.
    torch.manual_seed(42)


def cleanup():
    dist.destroy_process_group()
    
def train_distrib(local_rank, global_rank, world_size, master_addr = 'localhost', master_port = '12355',backend = 'gloo'):
    print('inside train_distrib rank {} and world_size {}'.format(global_rank,world_size))
    loaddata('./MNIST_data/{}/'.format(local_rank))
    #initialize process group and set seed
    setup(local_rank,global_rank,world_size, master_addr , master_port,backend )
    
    device_id = local_rank
    
    #Initialize model in it's own device. Can be any device. 
    #Better to initialize in own device for avoiding memory constraints (theory)
    model = LinearModel(784,10,verbose = False).to(device_id)
    
    #Use Distributed data parallel. Check the documentation of DistributedDataParallel?
    ddp_model = DistributedDataParallel(model,device_ids=[device_id], output_device=device_id)
    
    #Check where all_reduce happens
    optimizer = torch.optim.Adam(ddp_model.parameters(), lr = args.lr)
    
    #A sampler is necessary as each process should work only on a subset of data. 
    #No need to pass rank if process group is initialized. Uses rank = dist.get_rank() to get rank
    distributed_sampler = DistributedSampler(train_ds, num_replicas = world_size)
    
    dataloader = DataLoader(train_ds,batch_size=args.batch, sampler= distributed_sampler)
    train(ddp_model, optimizer, dataloader, device_id)
    
    #only evaluate on first node
    if global_rank == 0:
        eval_dataloader = dataloader = DataLoader(test_ds, batch_size=args.batch)
        eval_loss, acc = evaluate(model,eval_dataloader, device_id)
        if args.isaml:
            from azureml.core.run import Run
            run = Run.get_context()
            run.log('eval_loss', eval_loss)
            run.log('acc', acc)
    
    cleanup()
    
#train_distrib(0,1)
    

In [None]:
import argparse
import os
if __name__ == '__main__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', type=int, help = 'Rank of the process')
    parser.add_argument('--global_rank', type=int, help = 'Rank of the process')
    parser.add_argument('--size',type=int, help = 'world size')
    parser.add_argument('--issingle',action='store_true')
    
    parser.add_argument('--batch',type=int, help = 'batch size', default = 16)
    parser.add_argument('--lr',type=float, help = 'learning rate', default = 0.001)
    parser.add_argument('--epochs',type=int, help = 'total epochs', default = 1)
    parser.add_argument('--isaml',action='store_true')
    parser.add_argument('--backend',type=str, default = 'gloo')
    
    args = parser.parse_args()
    print('args = ',args)
    if args.isaml:
        print('environment variables = ', os.environ)
    
    #runs for both AML and local
    if args.issingle:
        print('Single GPU training')
        loaddata()
        train_singlegpu()
        
    
    #AML and distributed multi node
    elif args.isaml:
        local_rank = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
        global_rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
        size = int(os.environ['OMPI_COMM_WORLD_SIZE'])
        
        #NCCL environment. Still works without it.
        os.environ['NCCL_SOCKET_IFNAME'] = '^docker0,lo'
        
        master_node_params = os.environ['AZ_BATCH_MASTER_NODE'].split(':')        
        master_node_addr = master_node_params[0]
        master_node_port = master_node_params[1]
        print(local_rank,global_rank,size,master_node_addr,master_node_port)
        train_distrib(local_rank,global_rank, size,master_node_addr, master_node_port, args.backend)
    
    #distributed without AML. Single node only
    else:
        print('starting distibuted training', args)
        train_distrib(args.local_rank, args.global_rank, args.size,backend = args.backend)

### Failed attempts at Creating subprocess

Now Create 4 processes and start training.



RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method

Sharing CUDA tensors
Sharing CUDA tensors between processes is supported only in Python 3, using a spawn or forkserver start methods. multiprocessing in Python 2 can only create subprocesses using fork, and it’s not supported by the CUDA runtime.

https://pytorch.org/docs/stable/notes/multiprocessing.html

### No logs received in mp spawn