In [None]:
# code implemented with python 3.10
# check whther cu116 or cu113 installable
!pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
!pip3 install numpy

In [None]:
import numpy as np
import copy
import torch
import torch.nn as nn
from torchvision.models import resnet18, resnet50
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler
import math
from collections import namedtuple
import time

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [161]:
# Code used from https://blog.paperspace.com/writing-resnet-from-scratch-in-pytorch/
def data_loader(data_dir,
                batch_size,
                random_seed=42,
                valid_size=0.1,
                shuffle=True,
                test=False):

    normalize = transforms.Normalize(
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010],
    )

    # define transforms
    transform = transforms.Compose([
            transforms.Resize((224,224)),
            transforms.ToTensor(),
            normalize,
    ])

    if test:
        dataset = datasets.CIFAR10(
          root=data_dir, train=False,
          download=True, transform=transform,
        )

        data_loader = torch.utils.data.DataLoader(
            dataset, batch_size=batch_size, shuffle=shuffle
        )

        return data_loader

    # load the dataset
    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(42)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)

    valid_loader = torch.utils.data.DataLoader(
        valid_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)


# CIFAR10 dataset
batch_size = 256
train_loader, valid_loader = data_loader(data_dir='./data',
                                         batch_size=batch_size)

test_loader = data_loader(data_dir='./data',
                              batch_size=batch_size,
                              test=True)

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [162]:
# https://github.com/JayPatwardhan/ResNet-PyTorch/blob/master/ResNet/ResNet.py
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))

        x = self.relu(self.batch_norm2(self.conv2(x)))

        x = self.conv3(x)
        x = self.batch_norm3(x)

        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        #add identity
        x+=identity
        x=self.relu(x)

        return x

class Block(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()


        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
      identity = x.clone()

      x = self.relu(self.batch_norm2(self.conv1(x)))
      x = self.batch_norm2(self.conv2(x))

      if self.i_downsample is not None:
          identity = self.i_downsample(identity)
      print(x.shape)
      print(identity.shape)
      x += identity
      x = self.relu(x)
      return x




class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*ResBlock.expansion, num_classes)

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []

        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes*ResBlock.expansion)
            )

        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion

        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))

        return nn.Sequential(*layers)



def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)

In [163]:
# Code inspired by https://github.com/xinyandai/gradient-quantization

# Compressor
class IdenticalCompressor(object):
    def __init__(self, size=None, shape=None, args=None):
        pass

    @staticmethod
    def compress(vec):
        return vec.clone()

    @staticmethod
    def decompress(signature):
        return signature

class PSQuantizer():
    def __init__(self, Compressor, parameters, args):
        self.parameters = list(parameters)
        self.num_layers = len(self.parameters)
        self.compressors = list()
        self.compressed_gradients = [list() for _ in range(self.num_layers)]
        self.args = args
        self.error_feedback = args.ef
        self.two_phase = self.args.two_phase
        for param in self.parameters:
            param_size = param.flatten().shape[0]
            self.compressors.append(
                Compressor(param_size, param.shape, args) if param_size > 1000
                else IdenticalCompressor()
            )
            if self.error_feedback:
                param.error = [torch.zeros_like(param)
                               for _ in range(args.num_users)]
            if self.error_feedback and self.two_phase:
                param.server_error = torch.zeros_like(param)

    def record(self, user, epoch):
        if self.args.scale == 'exp':
            scale = (2 / (math.exp(-epoch) + 1) - 1)
        else:
            scale = float(self.args.scale)

        for i, param in enumerate(self.parameters):
            if self.error_feedback:
                param.grad.data.add_(scale * param.error[user])
                decompressed_g = self.compressors[i].decompress(
                    self.compressors[i].compress(param.grad.data)
                )
                param.error[user].data = param.grad.data - decompressed_g
            else:
                decompressed_g = self.compressors[i].decompress(
                    self.compressors[i].compress(param.grad.data)
                )
            self.compressed_gradients[i].append(decompressed_g)

    def apply(self):
        for i, param in enumerate(self.parameters):
            g = torch.stack(self.compressed_gradients[i], dim=0).mean(dim=0)

            # if compress gradient on two phase, i.e.,
            # compress the sum of decompressed gradient
            if self.two_phase:
                if self.error_feedback:
                    g.add_(param.server_error)
                    decompressed_g = self.compressors[i].decompress(
                        self.compressors[i].compress(g))
                    param.server_error = g - decompressed_g
                    g = decompressed_g
                else:
                    g = self.compressors[i].decompress(
                        self.compressors[i].compress(g))

            param.grad.data = g
        for compressed in self.compressed_gradients:
            compressed.clear()

In [164]:
# Code inspired by https://github.com/xinyandai/gradient-quantization
class QSGDCompressor(object):
    def __init__(self, size, shape, args):
        self.random = args.random
        self.bit = args.n_bit
        c_dim = args.c_dim
        assert self.bit > 0

        self.cuda = not args.no_cuda
        self.s = 2 ** self.bit
        self.size = size
        self.shape = shape


        self.code_dtype = torch.int32


    def compress(self, vec):
        """
        :param vec: torch tensor
        :return: norm, signs, quantized_intervals
        """
        vec = vec.view(-1)
        # norm = torch.norm(vec, dim=1, keepdim=True)
        norm = torch.max(torch.abs(vec), dim=0, keepdim=True)[0]
        normalized_vec = vec / norm

        scaled_vec = torch.abs(normalized_vec) * self.s
        l = torch.clamp(scaled_vec, 0, self.s-1).type(self.code_dtype)

        if self.random:
            # l[i] <- l[i] + 1 with probability |v_i| / ||v|| * s - l
            probabilities = scaled_vec - l.type(torch.float32)
            r = torch.rand(l.size())
            if self.cuda:
                r = r.cuda()
            l[:] += (probabilities > r).type(self.code_dtype)

        signs = torch.sign(vec) > 0
        return [norm, signs.view(self.shape), l.view(self.shape)]

    def decompress(self, signature):
        [norm, signs, l] = signature
        assert l.shape == signs.shape
        scaled_vec = l.type(torch.float32) * (2 * signs.type(torch.float32) - 1)
        compressed = (scaled_vec.view(-1)) * norm / self.s
        return compressed.view(self.shape)

In [165]:
# Code inspired by https://github.com/xinyandai/gradient-quantization
def train(batch_size, nodes, model, device, train_loader, test_loader, optimizer, quantizer, epoch, criterion):
    model.train()
    num_users = nodes
    train_data = list()
    # here the real batch size is (num_users * batch_size)
    for batch_idx, (data, target) in enumerate(train_loader):
      #print(f'batch index {batch_idx}')
      train_data.clear()

      for user_id in range(num_users-1):
          train_data.append((data[user_id*batch_size:(user_id+1)*batch_size],
                               target[user_id*batch_size:(user_id+1)*batch_size]))
      train_data.append((data[(num_users-1)*batch_size:],
                           target[(num_users-1)*batch_size:]))

      loss = one_iter(model, device, criterion, optimizer,
                        quantizer, train_data, num_users, epoch=epoch)

    print('Train Epoch: {} Done.\tLoss: {:.6f}'.format(epoch, loss.item()))


def one_iter(model, device, loss_func, optimizer, quantizer, train_data, num_users, epoch):
    assert num_users == len(train_data)
    model.train()
    user_gradients = [list() for _ in model.parameters()]
    all_losses = []
    for user_id in range(num_users):
        optimizer.zero_grad()
        _data, _target = train_data[user_id]
        data, target = _data.to(device), _target.to(device)
        pred = model(data)
        loss = loss_func(pred, target)
        # print(loss)
        all_losses.append(loss)
        loss.backward()
        quantizer.record(user_id, epoch=epoch)
    quantizer.apply()
    optimizer.step()
    return torch.stack(all_losses).mean()

In [166]:
def run_model(num_epochs, batch_size, nodes, model, optimizer, quantizer, criterion):
  for epoch in range(num_epochs):
    train(batch_size, nodes, model, device, train_loader, test_loader, optimizer, quantizer, epoch, criterion)

    # Validation
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs

        print('Accuracy of the network on the {} validation images: {} %'.format(5000, 100 * correct / total))

  with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs

    print('Accuracy of the network on the {} test images: {} %'.format(10000, 100 * correct / total))

In [167]:
### Default model setup ###

# args used for quantization with defaults set
arguments = namedtuple('arguments', ['ef', 'two_phase', 'n_bit', 'c_dim', 'random', 'no_cuda', 'num_users', 'scale'])
error_feedback = True
two_phase = False
random = True
no_cuda = False
c_dim = 0
scale = 1

# Other params
num_classes = 10
num_epochs = 20
learning_rate = 0.01

# ResNet50
model = ResNet50(num_classes, channels=3).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.001, momentum = 0.9)

In [None]:
# Run model for 2 nodes and 8 quantization bits
num_nodes = 2
num_bits = 8
batch_size = batch_size // num_nodes

print(f'###########################################################################\n')
print(f'Running ResNet50 for {num_nodes} node(s) and {num_bits} quantization bits:')
start = time.time()

args = arguments(ef=error_feedback, two_phase=two_phase, n_bit=num_bits, c_dim=c_dim, random=random, no_cuda=no_cuda, num_users=num_nodes, scale=scale)
quantizer = PSQuantizer(QSGDCompressor, model.parameters(), args)

run_model(num_epochs, batch_size, num_nodes, model, optimizer, quantizer, criterion)
end = time.time()
print(f'Elapsed time: {(end - start) / 60} minutes')
print(f'\n\n')

In [None]:
# Run model for 2 nodes and 4 quantization bits
num_nodes = 2
num_bits = 4
batch_size = batch_size // num_nodes

print(f'###########################################################################\n')
print(f'Running ResNet50 for {num_nodes} node(s) and {num_bits} quantization bits:')
start = time.time()

args = arguments(ef=error_feedback, two_phase=two_phase, n_bit=num_bits, c_dim=c_dim, random=random, no_cuda=no_cuda, num_users=num_nodes, scale=scale)
quantizer = PSQuantizer(QSGDCompressor, model.parameters(), args)

run_model(num_epochs, batch_size, num_nodes, model, optimizer, quantizer, criterion)
end = time.time()
print(f'Elapsed time: {(end - start) / 60} minutes')
print(f'\n\n')

In [None]:
# Run model for 2 nodes and 2 quantization bits
num_nodes = 2
num_bits = 2
batch_size = batch_size // num_nodes

print(f'###########################################################################\n')
print(f'Running ResNet50 for {num_nodes} node(s) and {num_bits} quantization bits:')
start = time.time()

args = arguments(ef=error_feedback, two_phase=two_phase, n_bit=num_bits, c_dim=c_dim, random=random, no_cuda=no_cuda, num_users=num_nodes, scale=scale)
quantizer = PSQuantizer(QSGDCompressor, model.parameters(), args)

run_model(num_epochs, batch_size, num_nodes, model, optimizer, quantizer, criterion)
end = time.time()
print(f'Elapsed time: {(end - start) / 60} minutes')
print(f'\n\n')

In [None]:
# Run baseline model for 2 nodes (i.e. 32 bits, no quantization)
num_nodes = 2
num_bits = 32
error_feedback = False
batch_size = batch_size // num_nodes

print(f'###########################################################################\n')
print(f'Running ResNet50 for {num_nodes} node(s) and {num_bits} quantization bits:')
start = time.time()

args = arguments(ef=error_feedback, two_phase=two_phase, n_bit=num_bits, c_dim=c_dim, random=random, no_cuda=no_cuda, num_users=num_nodes, scale=scale)
quantizer = PSQuantizer(QSGDCompressor, model.parameters(), args)

run_model(num_epochs, batch_size, num_nodes, model, optimizer, quantizer, criterion)
end = time.time()
print(f'Elapsed time: {(end - start) / 60} minutes')
print(f'\n\n')