Installing torchmetrics which is used for metrics calculations

In [0]:
pip install torchmetrics

Python interpreter will be restarted.
Collecting torchmetrics
  Using cached torchmetrics-0.10.2-py3-none-any.whl (529 kB)
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.10.2
Python interpreter will be restarted.


In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import os
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import FashionMNIST
from torchmetrics.functional import accuracy

Setting up checkpoint location
The next cell creates a directory for saved checkpoint models. Databricks recommends saving training data under dbfs:/ml, which maps to file:/dbfs/ml on driver and worker nodes.

In [0]:
PYTORCH_DIR = '/dbfs/ml/horovod_pytorch'

https://docs.databricks.com/_static/notebooks/deep-learning/mnist-pytorch.html

Note that We are using same model configuration as demo-04 but here we will be testing the model too.Optimiser is also same as demo-04

In [0]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()

        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)
        
    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)


Here We are configuring for single node training.

In [0]:
batch_size = 32
num_epochs = 20
log_interval = 100

def train_one_epoch(model, device, data_loader, optimizer, epoch):
    model.train()

    for batch_idx, (data, target) in enumerate(data_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        
        optimizer.step()
        
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(data_loader) * len(data),
                100. * batch_idx / len(data_loader), loss.item()))


Creating methods for saving and loading model checkpoints

In [0]:
def save_checkpoint(log_dir, model, optimizer, epoch):
    filepath = log_dir + '/checkpoint-{epoch}.pth.tar'.format(epoch = epoch)
    
    state = {
      'model': model.state_dict(),
      'optimizer': optimizer.state_dict(),
    }
    
    torch.save(state, filepath)
    
def load_checkpoint(log_dir, epoch = num_epochs):
    filepath = log_dir + '/checkpoint-{epoch}.pth.tar'.format(epoch = epoch)
    
    return torch.load(filepath)
 
def create_log_dir():
    log_dir = os.path.join(PYTORCH_DIR, str(time()), 'FashionMNISTDemo')
    os.makedirs(log_dir)
    
    return log_dir


Running single-node training with PyTorch

In [0]:
from torchvision import datasets, transforms
from time import time
 
single_node_log_dir = create_log_dir()
print('Log directory:', single_node_log_dir)
 
def train(learning_rate):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
 
    train_dataset = datasets.FashionMNIST(
      'data', 
      train = True,
      download = True,
      transform = transforms.Compose([transforms.ToTensor()])
    )

    data_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
 
    model = Net().to(device)
 
    optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
 
    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, device, data_loader, optimizer, epoch)
        save_checkpoint(single_node_log_dir, model, optimizer, epoch)
 
    
def test(log_dir):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    loaded_model = Net().to(device)
  
    checkpoint = load_checkpoint(log_dir)
    loaded_model.load_state_dict(checkpoint['model'])
    loaded_model.eval()
 
    test_dataset = datasets.FashionMNIST(
        'data', 
        train = False,
        download = True,
        transform = transforms.Compose([transforms.ToTensor()])
    )
    
    data_loader = torch.utils.data.DataLoader(test_dataset)
    
    correct = 0
    total = 0
    test_loss = 0
    for data, target in data_loader:
        data, target = data.to(device), target.to(device)
        output = loaded_model(data)
        test_loss += F.nll_loss(output, target)
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

    test_loss /= len(data_loader.dataset)

    print('Average test loss: {}'.format(test_loss.item()))
    print('Accuracy of the network on the test images: ', 100 * correct // total, '%')

Log directory: /dbfs/ml/horovod_pytorch/1668000143.9331226/FashionMNISTDemo


In [0]:
train(learning_rate = 0.02)

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to data/FashionMNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/26421880 [00:00<?, ?it/s]

Extracting data/FashionMNIST/raw/train-images-idx3-ubyte.gz to data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to data/FashionMNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/29515 [00:00<?, ?it/s]

Extracting data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/4422102 [00:00<?, ?it/s]

Extracting data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz to data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/5148 [00:00<?, ?it/s]

Extracting data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz to data/FashionMNIST/raw



In [0]:
test(single_node_log_dir)

Average test loss: 0.7281309366226196
Accuracy of the network on the test images:  71 %


Migrating to HorovodRunner
HorovodRunner takes a Python method that contains deep learning training code with Horovod hooks. HorovodRunner pickles the method on the driver and distributes it to Spark workers. A Horovod MPI job is embedded as a Spark job using barrier execution mode.

In [0]:
import horovod.torch as hvd
from sparkdl import HorovodRunner

In [0]:
hvd_log_dir = create_log_dir()
print('Log directory:', hvd_log_dir)
 
def train_hvd(learning_rate):
  
    # Initialize Horovod
    hvd.init()  
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  
    if device.type == 'cuda':
       # Pin GPU to local rank
       torch.cuda.set_device(hvd.local_rank())
 
    train_dataset = datasets.FashionMNIST(
        # Use different root directory for each worker to avoid conflicts
        root = 'data-%d'% hvd.rank(),  
        train = True, 
        download = True,
        transform = transforms.Compose([transforms.ToTensor()])
    )
 
    from torch.utils.data.distributed import DistributedSampler
  
    # Configure the sampler so that each worker gets a distinct sample of the input dataset
    train_sampler = DistributedSampler(train_dataset, num_replicas = hvd.size(), rank = hvd.rank())
    
    # Use train_sampler to load a different sample of data on each worker
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, sampler = train_sampler)
 
    model = Net().to(device)
    optimizer = optim.Adam(model.parameters(), lr = 0.02)
 
    # Wrap the local optimizer with hvd.DistributedOptimizer so that Horovod handles the distributed optimization
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters = model.named_parameters())
  
    # Broadcast initial parameters so all workers start with the same parameters
    hvd.broadcast_parameters(model.state_dict(), root_rank = 0)
 
    for epoch in range(1, num_epochs + 1):
        
        train_one_epoch(model, device, train_loader, optimizer, epoch)
        
        # Save checkpoints only on worker 0 to prevent conflicts between workers
        if hvd.rank() == 0:
            save_checkpoint(hvd_log_dir, model, optimizer, epoch)

Log directory: /dbfs/ml/horovod_pytorch/1668000582.9829545/FashionMNISTDemo


Now that you have defined a training function with Horovod, you can use HorovodRunner to distribute the work of training the model.

The HorovodRunner parameter np sets the number of processes. This example uses a cluster with two workers, each with a single GPU, so set np=2. (If you use np=-1, HorovodRunner trains using a single process on the driver node.)
Note that it takes nearly half(2.6 mins) of the time that of  single node training(4.9 mins)

In [0]:
hr = HorovodRunner(np = 2)

hr.run(train_hvd, learning_rate = 0.02)

HorovodRunner will only stream logs generated by :func:`sparkdl.horovod.log_to_driver` or
:class:`sparkdl.horovod.tensorflow.keras.LogCallback` to notebook cell output. If want to stream all
logs to driver for debugging, you can set driver_log_verbosity to 'all', like `HorovodRunner(np=2,
driver_log_verbosity='all')`.
The global names read or written to by the pickled function are {'hvd': None, 'torch': None, 'datasets': None, 'transforms': None, 'batch_size': None, 'Net': None, 'optim': None, 'range': None, 'num_epochs': None, 'train_one_epoch': None, 'save_checkpoint': None, 'hvd_log_dir': None}.
The pickled object size is 4895 bytes.

### How to enable Horovod Timeline? ###
HorovodRunner has the ability to record the timeline of its activity with Horovod  Timeline. To
record a Horovod Timeline, set the `HOROVOD_TIMELINE` environment variable  to the location of the
timeline file to be created. You can then open the timeline file  using the chrome://tracing
facility of the Chrome bro

In [0]:
test(hvd_log_dir)

Average test loss: 0.6396099925041199
Accuracy of the network on the test images:  74 %


Under the hood, HorovodRunner takes a Python method that contains deep learning training code with Horovod hooks. HorovodRunner pickles the method on the driver and distributes it to Spark workers. A Horovod MPI job is embedded as a Spark job using the barrier execution mode. The first executor collects the IP addresses of all task executors using BarrierTaskContext and triggers a Horovod job using mpirun. Each Python MPI process loads the pickled user program, deserializes it, and runs it.