In [25]:
# -*- coding: utf-8 -*-
"""
Created on Sun Nov  8 18:27:53 2020

@author: Livnat
"""
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torchvision.datasets as datasets
from torchvision import transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from math import ceil
from random import Random

""" Dataset partitioning helper """
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output	


class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])
	
def partition_dataset():
		   dataset = datasets.MNIST('./data', train=True, download=True,
		                             transform=transforms.Compose([
		                                 transforms.ToTensor(),
		                                 transforms.Normalize((0.1307,), (0.3081,))
		                             ]))
		   size = dist.get_world_size()
		   bsz = int(128 / (size))
		   partition_sizes = [1.0 / size for _ in range(size)]
		   partition = DataPartitioner(dataset, partition_sizes)
		   partition = partition.use(dist.get_rank())
		   train_set = torch.utils.data.DataLoader(partition,
		                                         batch_size=bsz,
		                                         shuffle=True)
		   return train_set, bsz
	

def average_gradients(model):
	    size = float(dist.get_world_size())
	    for param in model.parameters():
	        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
	        param.grad.data /= size
			
			
def run(rank, size):
	    torch.manual_seed(1234)
	    train_set, bsz = partition_dataset()
	    model = Net()
	    optimizer = optim.SGD(model.parameters(),
	                          lr=0.01, momentum=0.5)
	
	    num_batches = ceil(len(train_set.dataset) / float(bsz))
	    for epoch in range(10):
			      epoch_loss = 0.0
			      correct = 0.0
			      for data, target in train_set:
					      optimizer.zero_grad()
					      output = model(data)
					      loss = F.nll_loss(output, target)
					      epoch_loss += loss.item()
					      loss.backward()
					      average_gradients(model)
					      optimizer.step()
					      pred = output.data.max(1, keepdim=True)[1]
					      correct += pred.eq(target.data.view_as(pred)).cpu().sum()			      
			      print('Rank ', dist.get_rank(), ', epoch ',
	              epoch, ': ', epoch_loss / num_batches , 'Accuracy:',  100. * correct / num_batches)
			


	
def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
	

	
if __name__ == "__main__":
    size = 2
    processes = []
    __spec__ = "ModuleSpec(name='builtins', loader=<class '_frozen_importlib.BuiltinImporter'>)"
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
		
		
		
		
		
		
		
		
		
		

Process Process-25:
Traceback (most recent call last):
Process Process-26:
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-25-43c62ca376a6>", line 135, in init_process
    fn(rank, size)
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "<ipython-input-25-43c62ca376a6>", line 114, in run
    for data, target in train_set:
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-25-43c62ca376a6>", line 135, in init_process
    fn(rank, size)
  File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py", line 435, in __next__
    data = self._next_data()
  File "<ipython-input-25-43c62ca376a6>", line 114, in run
    fo

KeyboardInterrupt: ignored

  File "/usr/local/lib/python3.6/dist-packages/PIL/Image.py", line 506, in __init__
    self._size = (0, 0)
  File "/usr/local/lib/python3.6/dist-packages/PIL/Image.py", line 2537, in new
    im = Image()
KeyboardInterrupt
  File "/usr/local/lib/python3.6/dist-packages/PIL/Image.py", line 504, in __init__
    self.im = None
KeyboardInterrupt


In [38]:
# -*- coding: utf-8 -*-
"""
Created on Sun Nov  8 18:27:53 2020

@author: Livnat
"""
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torchvision.datasets as datasets
from torchvision import transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from math import ceil
from random import Random
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler

""" Dataset partitioning helper """
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output	


class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])
	
def partition_dataset():
		   dataset = datasets.MNIST('./data', train=True, download=True,
		                             transform=transforms.Compose([
		                                 transforms.ToTensor(),
		                                 transforms.Normalize((0.1307,), (0.3081,))
		                             ]))
       
		   size = dist.get_world_size()
		   bsz = int(128 / (size))
		   partition_sizes = [1.0 / size for _ in range(size)]
		   partition = DataPartitioner(dataset, partition_sizes)
		   partition = partition.use(dist.get_rank())

		   indices = list(range(int(len(dataset)*partition_sizes[dist.get_rank()])))
		   np.random.shuffle(indices)
		   split = int(np.floor(0.1*len(dataset)))
		   train_sample = SubsetRandomSampler(indices[split:])
		   valid_sample = SubsetRandomSampler(indices[:split])
		   train_set = torch.utils.data.DataLoader(partition, sampler = train_sample,
		                                         batch_size=bsz )       
		   validation_set=torch.utils.data.DataLoader(partition, sampler = valid_sample, batch_size=bsz)  
		   return train_set,validation_set, bsz
	

def average_gradients(model):
	    size = float(dist.get_world_size())
	    for param in model.parameters():
	        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
	        param.grad.data /= size
			
			
def run(rank, size):
	    torch.manual_seed(1234)
	    train_set,validation_set, bsz = partition_dataset()
	    model = Net()
	    optimizer = optim.SGD(model.parameters(),
	                          lr=0.01, momentum=0.5)
	
	    train_num_batches = ceil(len(train_set.sampler) / float(bsz))
	    valid_num_batches = ceil(len(validation_set.sampler) / float(bsz))
     
	    for epoch in range(10):
			      epoch_loss = 0.0
			      correct = 0.0
			      valid_loss = 0.0
			      correct_val = 0.0            
			      for data, target in train_set:
					      optimizer.zero_grad()
					      output = model(data)
					      loss = F.nll_loss(output, target)
					      epoch_loss += loss.item()
					      loss.backward()
					      average_gradients(model)
					      optimizer.step()
					      pred = output.data.max(1, keepdim=True)[1]
					      correct += pred.eq(target.data.view_as(pred)).cpu().sum()	
			      for data, target in validation_set:
					      output = model(data)
					      loss = F.nll_loss(output, target)
					      valid_loss += loss.item()
					      pred = output.data.max(1, keepdim=True)[1]
					      correct_val += pred.eq(target.data.view_as(pred)).cpu().sum()			      
			      print('Rank ', dist.get_rank(), ', epoch ',
	              epoch, ': ', epoch_loss / (train_num_batches) , 'Train Accuracy:',  100. * correct / (len(train_set.sampler)), 'Valid Loss: ', valid_loss / (valid_num_batches) , 'Valid Accuracy:',  100. * correct_val / (len(validation_set.sampler)))
			


	
def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
	

	
if __name__ == "__main__":
    size = 2
    processes = []
    __spec__ = "ModuleSpec(name='builtins', loader=<class '_frozen_importlib.BuiltinImporter'>)"
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
		
		
		
		
		
		
		
		
		
		
		
		
		
		
		

Process Process-67:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Process Process-68:
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "<ipython-input-38-03122ed3e5b5>", line 154, in init_process
    fn(rank, size)
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-38-03122ed3e5b5>", line 127, in run
    for data, target in train_set:
  File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py", line 435, in __next__
    data = self._next_data()
  File "<ipython-input-38-03122ed3e5b5>", line 154, in init_process
    fn(rank, size)
  File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/d

KeyboardInterrupt: ignored

  File "/usr/local/lib/python3.6/dist-packages/torchvision/transforms/functional.py", line 278, in normalize
    if (std == 0).any():
KeyboardInterrupt
KeyboardInterrupt
