In [1]:
import os
import torch
import json
import copy
import numpy as np
from torchvision import datasets, transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import logging
import random
import datetime as dt

In [2]:
_cfg = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}

def _make_layers(cfg):
    layers = []
    in_channels = 3
    for layer_cfg in cfg:
        if layer_cfg == 'M':
            layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        else:
            layers.append(nn.Conv2d(in_channels=in_channels,
                                    out_channels=layer_cfg,
                                    kernel_size=3,
                                    stride=1,
                                    padding=1,
                                    bias=True))
            layers.append(nn.BatchNorm2d(num_features=layer_cfg))
            layers.append(nn.ReLU(inplace=True))
            in_channels = layer_cfg
    return nn.Sequential(*layers)

class _VGG(nn.Module):
    """
    VGG module for 3x32x32 input, 10 classes
    """

    def __init__(self, name='VGG11'):
        super(_VGG, self).__init__()
        cfg = _cfg[name]
        self.layers = _make_layers(cfg)
        flatten_features = 512
        self.fc1 = nn.Linear(flatten_features, 10)

    def forward(self, x):
        y = self.layers(x)
        y = y.view(y.size(0), -1)
        y = self.fc1(y)
        return y

def VGG11():
    return _VGG('VGG11')


In [3]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

In [4]:
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    print("in ddp_setup")
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    #init_process_group(backend="nccl", rank=rank, world_size=world_size)
    # rank = torch.device("cuda:{}".format(current_gpu_index))

    torch.distributed.init_process_group(
        backend="nccl",
        init_method="env://",
        world_size=world_size,
        rank=rank,
    )
    torch.cuda.set_device(rank)

In [5]:
# device = "cpu"
# device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [10]:
class Trainer:
    def __init__(self, model, gpu_id, train_loader, optimizer):
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.model = DDP(model, device_ids=[gpu_id])
        self.train_loader = train_loader
        self.optimizer = optimizer

    def train(self, total_epochs):
        for epoch in range(total_epochs):
          self.train_model(epoch)
        return None

    def train_model(self, epoch):
        epoch_startTime = dt.datetime.now()
        running_loss = 0.0
        for batch_idx, (data, target) in enumerate(self.train_loader):
            batch_startTime = dt.datetime.now()
            data, target = data.to(self.gpu_id), target.to(self.gpu_id)
            self.optimizer.zero_grad()
            outputs = self.model(data)

            loss = torch.nn.CrossEntropyLoss()(outputs, target)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()

            msg_iteration = 20
            if batch_idx % msg_iteration == msg_iteration-1:    # print every $msg_iteration mini-batches
                batch_endTime = dt.datetime.now()
                batch_time = "{:.2f}".format((batch_endTime - batch_startTime).total_seconds())
                print(f'rank : {self.gpu_id} epoch : {epoch + 1} batch_no:{batch_idx + 1:5d} MeanLoss_last_{msg_iteration}_batches: {running_loss / msg_iteration:.3f} current_batch_time: {batch_time} secs')
                if  self.gpu_id == 0 :
                    ckp = self.model.module.state_dict()
                    PATH = "checkpoint.pt"
                    torch.save(ckp, PATH)
                    print(f"Training checkpoint saved at {PATH}")
                running_loss = 0.0

        epoch_endTime = dt.datetime.now()
        epoch_time = "{:.2f}".format((epoch_endTime - epoch_startTime).total_seconds())
        print("rank : ",{self.gpu_id},"Time taken for epoch : ",epoch + 1," = ",epoch_time," secs\n" )

        return None


In [7]:
def main(rank, world_size):
    print("in")
    ddp_setup(rank, world_size)
    gpu_id = rank
    batch_size = 256
    normalize = transforms.Normalize(mean=[x/255.0 for x in [125.3, 123.0, 113.9]],
                                std=[x/255.0 for x in [63.0, 62.1, 66.7]])
    transform_train = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
            ])

    training_set = datasets.CIFAR10(root="./data", train=True,
                                                download=True, transform=transform_train)
    train_loader = torch.utils.data.DataLoader(training_set,
                                                    batch_size=batch_size,
                                                    sampler=DistributedSampler(training_set),
                                                    shuffle=False,
                                                    pin_memory=True)


    # training_criterion = torch.nn.CrossEntropyLoss()
    model = VGG11()
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=0.0001)
    total_epochs = 20
    trainer = Trainer(model, gpu_id, train_loader, optimizer)
    trainer.train(total_epochs)

    destroy_process_group()


In [8]:
torch.cuda.device_count()

1

In [11]:
if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    # mp.spawn(main, args=(world_size,), nprocs=world_size, join=True, start_method='fork')
    mp.start_processes(main, args=(world_size,), nprocs=world_size, join=True, start_method='fork')


in
in ddp_setup
Files already downloaded and verified
rank : 0 epoch : 1 batch_no:   20 MeanLoss_last_20_batches: 6.591 current_batch_time: 0.05 secs
Training checkpoint saved at checkpoint.pt
rank : 0 epoch : 1 batch_no:   40 MeanLoss_last_20_batches: 2.759 current_batch_time: 0.05 secs
Training checkpoint saved at checkpoint.pt
rank : 0 epoch : 1 batch_no:   60 MeanLoss_last_20_batches: 2.602 current_batch_time: 0.05 secs
Training checkpoint saved at checkpoint.pt
rank : 0 epoch : 1 batch_no:   80 MeanLoss_last_20_batches: 2.424 current_batch_time: 0.05 secs
Training checkpoint saved at checkpoint.pt
rank : 0 epoch : 1 batch_no:  100 MeanLoss_last_20_batches: 2.346 current_batch_time: 0.05 secs
Training checkpoint saved at checkpoint.pt
rank : 0 epoch : 1 batch_no:  120 MeanLoss_last_20_batches: 2.316 current_batch_time: 0.05 secs
Training checkpoint saved at checkpoint.pt
rank : 0 epoch : 1 batch_no:  140 MeanLoss_last_20_batches: 2.312 current_batch_time: 0.06 secs
Training checkpo

In [12]:
def testing_model(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target)
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader)
    print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))

In [13]:
def testing():
    batch_size = 256
    normalize = transforms.Normalize(mean=[x/255.0 for x in [125.3, 123.0, 113.9]],
                                std=[x/255.0 for x in [63.0, 62.1, 66.7]])
    transform_test = transforms.Compose([
            transforms.ToTensor(),
            normalize])
    test_set = datasets.CIFAR10(root="./data", train=False,
                                    download=True, transform=transform_test)

    test_loader = torch.utils.data.DataLoader(test_set,
                                              num_workers=2,
                                              batch_size=batch_size,
                                              shuffle=False,
                                                  pin_memory=True)

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    training_criterion = torch.nn.CrossEntropyLoss()

    test_model = VGG11()
    test_model.to(device)

    checkpoint = torch.load('checkpoint.pt')
    test_model.load_state_dict(checkpoint)

    testing_model(test_model, test_loader, training_criterion, device)

testing()

Files already downloaded and verified
Test set: Average loss: 0.5737, Accuracy: 8084/10000 (81%)

