In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from src.models import ResNet, MLP
from torchvision.utils import make_grid
import matplotlib
import matplotlib.pyplot as plt
import src.utils as utils
%matplotlib inline
import torch.nn.functional as F

In [3]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
stats = ((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
stats = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5)
batch_size = 100

In [4]:
# Data augmentation and normalization for training
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(*stats)
])

# Normalization for testing
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(*stats)
])

# CIFAR-10 dataset
train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transform_train, download=True)
test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform_test)

# Data loaders
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


Files already downloaded and verified


In [5]:
def _weights_init(m):
    classname = m.__class__.__name__
    #print(classname)
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight)

class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd

    def forward(self, x):
        return self.lambd(x)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, option='A'):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            if option == 'A':
                """
                For CIFAR10 ResNet paper uses option A.
                """
                self.shortcut = LambdaLayer(lambda x:
                                            F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, planes//4, planes//4), "constant", 0))
            elif option == 'B':
                self.shortcut = nn.Sequential(
                     nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                     nn.BatchNorm2d(self.expansion * planes)
                )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNetB(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNetB, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.linear = nn.Linear(64, num_classes)

        self.apply(_weights_init)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, out.size()[3])
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def resnet20():
    return ResNetB(BasicBlock, [3, 3, 3])

def get_model2(model, learning_rate=1e-3, weight_decay=1e-4):

    # set the first layer not trainable
    # model.features.conv0.weight.requires_grad = False

    # all fc layers
    weights = [
        p for n, p in model.named_parameters()
        if 'weight' in n and 'conv' not in n
    ]

    # all conv layers
    weights_to_be_quantized = [
        p for n, p in model.named_parameters()
        # if 'conv' in n and 'conv0' not in n
        if 'conv' in n and 'weight' in n
    ]

    biases = [
        p for n, p in model.named_parameters()
        if 'bias' in n
    ]    

    params = [
        {'params': weights, 'weight_decay': weight_decay},
        {'params': weights_to_be_quantized, 'weight_decay': weight_decay},
        {'params': biases,  'weight_decay': weight_decay}
    ]
    optimizer = optim.SGD(params, lr=learning_rate, momentum=0.9)

    loss = nn.CrossEntropyLoss().cuda()
    model = model.cuda()  # move the model to gpu
    return model, loss, optimizer
def quantize_bw(kernel : torch.Tensor):
    """
    binary quantization
    Return quantized weights of a layer.
    """
    delta = kernel.abs().mean()
    sign = kernel.sign().float()



    return sign*delta

# RESNET no quantization

In [6]:
# Test the model
model_dict = torch.load('./models/resnet.ckpt')
model = ResNet(in_channels=16, num_classes=10)
model.load_state_dict(model_dict)

loss, acc = utils.evaluate_model(model, test_loader, device,)
print(f'Loss: {loss}, Accuracy: {acc*100}%')
utils.print_model_size(model)
#print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))

Loss: 0.0, Accuracy: 88.25999450683594%
0.81 MB


# RESNET Quantization 4 bits

In [7]:
# Data augmentation and normalization for training
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(*stats)
])

# Normalization for testing
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(*stats)
])

# CIFAR-10 dataset
train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transform_train, download=True)
test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform_test)

batch_size_test = int(len(test_dataset)/40 )#100
batch_size_train = 100
# Data loaders
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size_train, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size_test, shuffle=False)
#--------------------------------------------------------------------------
    

Files already downloaded and verified


In [9]:
# Test the model
load_dict = torch.load('./models/resnet_4bits.pth')
#print(load_dict.keys())
state_dict = load_dict['state']
#print(state_dict.keys())
model = resnet20()
model.load_state_dict(state_dict)

loss, acc = utils.evaluate_model(model, test_loader, device)
print(f'Loss: {loss}, Accuracy: {acc*100}%')
utils.print_model_size(model)

Loss: 0.0, Accuracy: 68.44999694824219%
1.12 MB


# Quantization 8 bits

In [10]:
# Test the model
load_dict = torch.load('./models/resnet_2bits.pth')
#print(load_dict.keys())
state_dict = load_dict['state']
#print(state_dict.keys())
model = resnet20()
model.load_state_dict(state_dict)

loss, acc = utils.evaluate_model(model, test_loader, device)
print(f'Loss: {loss}, Accuracy: {acc*100}%')
utils.print_model_size(model)

Loss: 0.0, Accuracy: 53.21999740600586%
1.12 MB


In [None]:
from tqdm.notebook import tqdm
from IPython.display import clear_output
# from tqdm.auto import tqdm  # notebook compatible
import time
bar = tqdm(range(5))
bar.refresh()
for i1 in bar:
    
    bar.set_description(f"Training {i1}",refresh=True)
    for i2 in tqdm(range(10), leave=False):
        # do something, e.g. sleep
        bar.write(f"i2={i2}")
        time.sleep(0.5)
    bar.set_description(f"Testing {i1}",refresh=True)
    for i3 in tqdm(range(10), leave=False):
        # do something, e.g. sleep
        bar.write(f"i3={i3}")
        time.sleep(0.5)
    clear_output(wait=True)

  0%|          | 0/10 [00:00<?, ?it/s]

i2=0
i2=1
i2=2
i2=3
i2=4
i2=5
i2=6
i2=7
i2=8
i2=9


  0%|          | 0/10 [00:00<?, ?it/s]

i3=0
i3=1
i3=2
i3=3
i3=4
i3=5


KeyboardInterrupt: 

In [None]:
param = [ 28 * 28, # input
                512, 256, 128, 64,
                10 ] #output

transform = transforms.Compose([
    transforms.ToTensor(), # convert the image to a PyTorch tensor
    transforms.Normalize((0.5,), (0.5,)) # normalize the image with mean=0.5 and std=0.5
])
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
test_dataset = datasets.MNIST(root='data/', train=False, transform=transform, download=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=100, shuffle=False)



In [None]:
# Loading pretrained model
modeldict = torch.load('models/mlp.ckpt')
model = MLP(param)
model.load_state_dict(modeldict)

q_dict = torch.load('models/mlp_dynamicq.ckpt')
print(q_dict.keys())
model_qd = MLP(param)
model_qd.load_state_dict(q_dict)

loss, acc = utils.evaluate_model(model, test_loader, device)
print(f'Loss: {loss}, Accuracy: {acc*100}%')

# model.to(device)
# quantized_model.eval()
# model.eval()
# with torch.no_grad():
#     correctq = 0
#     totalq = 0
#     total = 0
#     correct = 0
#     for images, labels in test_loader:
#         images_cuda = images.to(device)
#         labels_cuda = labels.to(device)

#         outputsq = quantized_model(images)
#         _, predictedq = torch.max(outputsq.data, 1)
#         totalq += labels.size(0)
#         correctq += (predictedq == labels).sum().item()
        
#         outputs = model(images_cuda)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels_cuda.size(0)
#         correct += (predicted == labels_cuda).sum().item()
        

#     print('Accuracy of the quantized model on the test images: {} %'.format(100 * correctq / totalq))
#     print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))




utils.print_model_size(model)
utils.print_model_size(model_qd)

odict_keys(['linears.0.scale', 'linears.0.zero_point', 'linears.0._packed_params.dtype', 'linears.0._packed_params._packed_params', 'linears.1.scale', 'linears.1.zero_point', 'linears.1._packed_params.dtype', 'linears.1._packed_params._packed_params', 'linears.2.scale', 'linears.2.zero_point', 'linears.2._packed_params.dtype', 'linears.2._packed_params._packed_params', 'linears.3.scale', 'linears.3.zero_point', 'linears.3._packed_params.dtype', 'linears.3._packed_params._packed_params', 'linears.4.scale', 'linears.4.zero_point', 'linears.4._packed_params.dtype', 'linears.4._packed_params._packed_params'])


  device=storage.device,


RuntimeError: Error(s) in loading state_dict for MLP:
	Missing key(s) in state_dict: "linears.0.weight", "linears.0.bias", "linears.1.weight", "linears.1.bias", "linears.2.weight", "linears.2.bias", "linears.3.weight", "linears.3.bias", "linears.4.weight", "linears.4.bias". 
	Unexpected key(s) in state_dict: "linears.0.scale", "linears.0.zero_point", "linears.0._packed_params.dtype", "linears.0._packed_params._packed_params", "linears.1.scale", "linears.1.zero_point", "linears.1._packed_params.dtype", "linears.1._packed_params._packed_params", "linears.2.scale", "linears.2.zero_point", "linears.2._packed_params.dtype", "linears.2._packed_params._packed_params", "linears.3.scale", "linears.3.zero_point", "linears.3._packed_params.dtype", "linears.3._packed_params._packed_params", "linears.4.scale", "linears.4.zero_point", "linears.4._packed_params.dtype", "linears.4._packed_params._packed_params". 