In [None]:
import numpy as np
import os
import torch
import torch.distributed as dist
import torch.optim as optim
from torch.multiprocessing import Process
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
from random import Random

WORLD_SIZE = 2
NUM_EPOCHS = 50
TRAINING_RECORD_INTERVAL = 25

In [None]:
class Partition(object):
    
    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]

class DataPartitioner(object):

    def __init__(self, data, sizes, seed = 8675309):
        self.data = data
        self.partions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partion):
        return Partition(self.data, self.partions[partion])

def partition_dataset(testing = False):
    transform_train = transforms.Compose([
            transforms.RandomResizedCrop(244),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])


    transform_test = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    dataset = datasets.CIFAR10('./data', download = True, train = (not testing), 
        transform = (transform_train if not testing else transform_test))

    size = dist.get_world_size()
    bsz = 128 // float(size)
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=int(bsz),
                                         shuffle=True)
    return train_set, bsz

def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size

In [None]:
def run(rank, size, model, criterion, optimizer):
    
    torch.manual_seed(8675309)
    training_set, bsz = partition_dataset()
    testing_set, bsz = partition_dataset(testing = True)

    model.cuda()
    criterion = criterion.cuda()

    # Set up record holder and testing set for only the master node
    if rank == 0:
        training_accuracy = []
        testing_accuracy = []
        
        imagenet_data_test = datasets.CIFAR10('./data', download = True, train = False)
        testing_size = len(imagenet_data_test)
        del imagenet_data_test
        

    for epoch_idx in range(NUM_EPOCHS):

        for batch_idx, (data, target) in enumerate(training_set):
            data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            outputs = model(data)
            
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == target).sum().item()
            loss = criterion(outputs, target)

            if batch_idx % TRAINING_RECORD_INTERVAL == 0:
              print('Rank %d\tEpoch: %d\tIterval: %d\tAccuracy : %d%%' % (
                  rank,
                  epoch_idx,
                  batch_idx,
                  100 * correct / 64))

            loss.backward()
        
            if rank == 0:
              if batch_idx % TRAINING_RECORD_INTERVAL == 0:
                training_accuracy.append(100 * correct / 64)

            average_gradients(model)
            optimizer.step()
        
        # After each epoch record testing results on master node
        testing_correct = 0
        for idx, (inputs, labels) in enumerate(testing_set):

            inputs, labels = inputs.cuda(), labels.cuda()
            outputs = model(inputs)

            _, predicted = torch.max(outputs.data, 1)
            testing_correct += (predicted == labels).sum().item()
    
        testing_correct = torch.tensor(testing_correct)
        # wait till all processes have finished the epoch
        dist.barrier()

        recv = dist.all_reduce(testing_correct, op = dist.reduce_op.SUM)
            
        if rank == 0:
            print('Epoch: %d\tAccuracy: %d %%' % (epoch_idx, 100 * testing_correct.data.item() / testing_size))
            testing_accuracy.append(100 * testing_correct / testing_size)
        
        dist.barrier()


    if rank == 0:
        np.save('/content/drive/My Drive/Colab Notebooks/Results/Parallel Control/training_accuracy.npy', training_accuracy)
        np.save('/content/drive/My Drive/Colab Notebooks/Results/Parallel Control/testing_accuracy.npy', testing_accuracy)
        torch.save(model.state_dict(), '/content/drive/My Drive/Colab Notebooks/Results/Parallel Control/model_control.pt')

In [None]:
def init_process(rank, size, model, criterion, optimizer, fn, backend = "gloo"):
    os.environ["MASTER_ADDR"] = "127.0.0.1"
    os.environ["MASTER_PORT"] = "29500"
    dist.init_process_group(backend, rank = rank, world_size = size)
    fn(rank, size, model, criterion, optimizer)

In [None]:
processes = []
model = models.alexnet(num_classes = 10)
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=0.0005)

for rank in range(WORLD_SIZE):
    p = Process(target = init_process, args = (rank, WORLD_SIZE, model, criterion, optimizer, run))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

print("Execution Finished")