# distributed deep learning sample 

Databricks Runtime 5.1 ML, GPU

## prepare storage
https://docs.azuredatabricks.net/applications/deep-learning/distributed-deep-learning/ddl-storage.html#ddl-fuse

In [3]:
FUSE_DIR = '/dbfs/horovod_pytorch'

## prepare network with pytorch

### simpleCNN

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

### senseless big network to challenge the limit of GPU-Memory

In [8]:
class BigNet(nn.Module):
    def __init__(self):
        super(BigNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 1000)
        self.fc2 = nn.Linear(1000,5000)
        self.fc3 = nn.Linear(5000,10000)
        self.fc4 = nn.Linear(10000,5000)
        self.fc5 = nn.Linear(5000,1000)
        self.fc6 = nn.Linear(1000,500)
        self.fc7 = nn.Linear(500,100)
        self.fc8 = nn.Linear(100,50)
        self.fc9 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc2(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc3(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc4(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc5(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc6(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc7(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc8(x))
        x = F.dropout(x, training=self.training)
        x = self.fc9(x)
        return F.log_softmax(x)

## prepare functions to train

In [10]:
def train_one_epoch(model, device, data_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(data_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(data_loader.dataset),
                100. * batch_idx / len(data_loader), loss.item()))

In [11]:
from time import time
import os

LOG_DIR = os.path.join(FUSE_DIR, str(time()), 'MNISTDemo')
os.makedirs(LOG_DIR)

In [12]:
def save_checkpoint(model, optimizer, epoch):
    filepath = LOG_DIR + '/checkpoint-{epoch}.pth.tar'.format(epoch=epoch)
        state = {
            'model': model.state_dict(),
            'optimizer': optimizer.state_dict(),
        }
    torch.save(state, filepath)

In [13]:
import torch.optim as optim
from torchvision import datasets, transforms

def train(learning_rate,small=True):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

      train_dataset = datasets.MNIST(
          'data', 
          train=True,
          download=True,
          transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
    data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    if small:
        model = Net().to(device)
    else:
        model = BigNet().to(device)
  
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)

    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, device, data_loader, optimizer, epoch)
        save_checkpoint(model, optimizer, epoch)

## HorovodRunner

In [15]:
import horovod.torch as hvd
from sparkdl import HorovodRunner

In [16]:
def train_hvd(learning_rate,small=True):
    hvd.init()  # Initialize Horovod.
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    train_dataset = datasets.MNIST(
        root='data-%d'% hvd.rank(),  # Use different root directory for each worker to avoid race conditions.
        train=True, 
        download=True,
        transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    )

    from torch.utils.data.distributed import DistributedSampler
  
  # Configure the sampler such that each worker obtains a distinct sample of input dataset.
    train_sampler = DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
  # Use trian_sampler to load a different sample of data on each worker.
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)

    if small:
        model = Net().to(device)
    else:
        model = BigNet().to(device)
  
  # Effective batch size in synchronous distributed training is scaled by the number of workers.
  # An increase in learning rate compensates for the increased batch size.
    optimizer = optim.SGD(model.parameters(), lr=learning_rate * hvd.size(), momentum=momentum)

  # Wrap the optimizer with Horovod's DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
  
  # Broadcast initial parameters so all workers start with the same parameters.
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)

    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, device, train_loader, optimizer, epoch)
        # Only save checkpoints on the first worker.
        if hvd.rank() == 0:
            save_checkpoint(model, optimizer, epoch)

## train

### small network

In [19]:
batch_size = 100
num_epochs = 300
momentum = 0.5
log_interval = 100

In [20]:

hr = HorovodRunner(np=8) # the numberob node
hr.run(train_hvd, learning_rate = 0.001)

## senseless big network

In [22]:
# Setting training parameters
batch_size = 300
num_epochs = 30
momentum = 0.5
log_interval = 100

In [23]:
hr = HorovodRunner(np=8) # We assume cluster consists of two workers.
hr.run(train_hvd, learning_rate = 0.001, small = False)