In [0]:
from google.colab import drive
drive.mount('/content/drive/')

!ls "/content/drive/My Drive"


In [0]:
!pip3 install https://download.pytorch.org/whl/cu80/torch-1.0.0-cp36-cp36m-linux_x86_64.whl
!pip3 install torchvision


In [0]:
!pip install Pillow

In [0]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from torchvision.transforms import transforms
import torch.utils.data as utils
import torchvision.models as models
import torch.optim as optim
from torch.optim import lr_scheduler
import time
import os
import copy
from torch.utils.data import sampler
from skimage.transform import resize
import cv2
from skimage import io, transform
from torch.utils.data.dataset import Dataset
from PIL import Image
from torch.autograd import Variable
from sklearn.model_selection import train_test_split

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [0]:
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='latin1')
    return dict

class ChunkSampler(sampler.Sampler):
    def __init__(self, num_samples, start=0):
        self.num_samples = num_samples
        self.start = start
    
    def __iter__(self):
        return iter(range(self.start, self.start+self.num_samples))
    
    def __len__(self):
        return self.num_samples

# **Data Preparation **

In [0]:
data_path_prefix = '/content/drive/My Drive/'

batch1 = unpickle(data_path_prefix + 'data_batch_1')
batch2 = unpickle(data_path_prefix + 'data_batch_2')
batch3 = unpickle(data_path_prefix + 'data_batch_3')
batch4 = unpickle(data_path_prefix + 'data_batch_4')
batch5 = unpickle(data_path_prefix + 'data_batch_5')
test_batch = unpickle(data_path_prefix + 'test_batch')

batch1_features = np.array(batch1['data'].reshape((len(batch1['data']), 3, 32, 32)).transpose(0, 2, 3, 1))
batch1_labels = np.array(batch1['labels'])

batch2_features = np.array(batch2['data'].reshape((len(batch2['data']), 3, 32, 32)).transpose(0, 2, 3, 1))
batch2_labels = np.array(batch2['labels'])

batch3_features = np.array(batch3['data'].reshape((len(batch3['data']), 3, 32, 32)).transpose(0, 2, 3, 1))
batch3_labels = np.array(batch3['labels'])

batch4_features = np.array(batch4['data'].reshape((len(batch4['data']), 3, 32, 32)).transpose(0, 2, 3, 1))
batch4_labels = np.array(batch4['labels'])

batch5_features = np.array(batch5['data'].reshape((len(batch5['data']), 3, 32, 32)).transpose(0, 2, 3, 1))
batch5_labels = np.array(batch5['labels'])

test_features = np.array(test_batch['data'].reshape((len(test_batch['data']), 3, 32, 32)).transpose(0, 2, 3, 1))
test_labels = np.array(test_batch['labels'])

#merges all train features 
train_val_features = np.concatenate((batch1_features, batch2_features, batch3_features, batch4_features, batch5_features), 0)
#merges all train labels
train_val_labels = np.concatenate((batch1_labels, batch2_labels, batch3_labels, batch4_labels, batch5_labels), 0)

train_features, val_features, train_labels, val_labels = train_test_split(train_val_features, train_val_labels, test_size=0.20, random_state=1)

Calculating mean and standard deviation values for normalization

In [0]:
train_mean = (np.mean(train_features, axis = (0, 1, 2)) / 255).tolist()
train_std = (np.std(train_features, axis = (0, 1, 2)) / 255).tolist()

val_mean = (np.mean(val_features, axis = (0, 1, 2)) / 255).tolist()
val_std = (np.std(val_features, axis = (0, 1, 2)) / 255).tolist()

test_mean = (np.mean(test_features, axis = (0, 1, 2)) / 255).tolist()
test_std = (np.std(test_features, axis = (0, 1, 2)) / 255).tolist()

# **Custom Dataset Class for Cifar10**

In [0]:
class Cifar10(Dataset):
    
    def __init__(self, features, labels, transform = None):
        self.features = features
        self.labels = labels
        self.transform = transform
                
    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        
        img = self.features[idx]
        label = self.labels[idx]

        if self.transform:
            img = self.transform(img)
                    
        return img, label

# **Data Transformations and Creating Dataloaders**

In [0]:
transform_train = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std),
])

transform_val = transforms.Compose([   
    transforms.ToPILImage(),
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(val_mean, val_std),
])

transform_test = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(224),    
    transforms.ToTensor(),
    transforms.Normalize(test_mean, test_std),
])

trainset = Cifar10(train_features, train_labels, transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size = 128, shuffle = True)

valset = Cifar10(val_features, val_labels, transform_val)
valloader = torch.utils.data.DataLoader(valset, batch_size = 128, shuffle = True)

testset = Cifar10(test_features, test_labels, transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size = 128, shuffle = True)

# **Training function for the model**

In [0]:
def train_model(model, criterion, optimizer, scheduler, num_epochs = 25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                loader = trainloader
                dataset_size = len(train_labels)
                scheduler.step()
                model.train()  # Set model to training mode
            else:
                loader = valloader
                dataset_size = len(val_labels)
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0
           
            # Iterate over data.
            for inputs, labels in loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_size
            epoch_acc = running_corrects.double() / dataset_size

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

# **Optimizing only final layer of pretrained network with Hinge loss**

Training

In [0]:
net = models.resnet50(pretrained=True)
for param in net.parameters():
    param.requires_grad = False

num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.MultiMarginLoss()
optimizer_ft = optim.SGD(net.fc.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# **Optimizing only final layer of pretrained network with Cross Entropy loss**

Training

In [0]:
net = models.resnet50(pretrained=True)
for param in net.parameters():
    param.requires_grad = False

num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(net.fc.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# **Optimizing all layers of pretrained network with Hinge loss**

Training

In [0]:
net = models.resnet50(pretrained=True)
num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.MultiMarginLoss()
optimizer_ft = optim.SGD(net.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# **Optimizing all layers of pretrained network with Cross Entropy loss**

Training

In [0]:
net = models.resnet50(pretrained = True)
num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(net.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

In the parts below, I experimented with my implementation of ResNet-50 rather than using the pretrained model.

# **ResNet-50 implementation for Cifar10**

In [0]:
class Bottleneck(nn.Module):
    
    def __init__(self, in_channels, out_channels, stride = 1, shortcut = None):
        super(Bottleneck, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = 1, bias = False) 
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = stride, padding = 1, bias = False) 
        self.bn2 = nn.BatchNorm2d(out_channels) 
        
        self.conv3 = nn.Conv2d(out_channels, out_channels * 4, kernel_size = 1, stride = 1, bias = False) 
        self.bn3 = nn.BatchNorm2d(out_channels * 4) 
        
        self.relu = nn.ReLU(inplace = True) 
        
        self.shortcut = shortcut 
        self.stride = stride

    def forward(self, x):
        identity = x

        output = self.conv1(x)
        output = self.bn1(output)
        output = self.relu(output)

        output = self.conv2(output)
        output = self.bn2(output)
        output = self.relu(output)

        output = self.conv3(output)
        output = self.bn3(output)

        if self.shortcut is not None: #use projection shortcut if input and output dimensions are not same
            identity = self.shortcut(x) 

        output = output + identity
        output = self.relu(output)

        return output

class ResNet50(nn.Module):

    def __init__(self, num_classes = 10):
        super(ResNet50, self).__init__()
        self.in_channels = 64
        
        self.conv1 = nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False) 
        self.bn1 = nn.BatchNorm2d(64) 
        self.relu = nn.ReLU(inplace = True) 
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1) 
        
        self.layer1 = self.make_layer(64, 3, 1)
        self.layer2 = self.make_layer(128, 4, 2)
        self.layer3 = self.make_layer(256, 6, 2)
        self.layer4 = self.make_layer(512, 3, 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_classes)

        ####WEIGHT INITIALIZATION 
        #initializes weights using He initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode = 'fan_out', nonlinearity = 'relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def make_layer(self, out_channels, num_of_blocks, stride):
      
        shortcut = None   #identity mapping
        if stride != 1 or self.in_channels != out_channels * 4: 
            shortcut = nn.Sequential(nn.Conv2d(self.in_channels, out_channels * 4, kernel_size = 1, stride = stride, bias = False), nn.BatchNorm2d(out_channels * 4)) #apply projection shortcut for first sublayers
            
        sublayers = []
        sublayers.append(Bottleneck(self.in_channels, out_channels, stride, shortcut))
        self.in_channels = out_channels * 4
        
        for i in range(1, num_of_blocks):
            sublayers.append(Bottleneck(self.in_channels, out_channels)) #no projection shortcut needed in other sublayers as input and output sizes are guarenteed to be same 

        return nn.Sequential(*sublayers)

    def forward(self, x):
      
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x



Since our custom implementation allows images with smaller input sizes than 224x224, no need for resizing.

In [0]:
#applied same transformations with the original paper for cifar10
transform_train = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std),
])

transform_val = transforms.Compose([   
    transforms.ToTensor(),
    transforms.Normalize(val_mean, val_std),
])

transform_test = transforms.Compose([  
    transforms.ToTensor(),
    transforms.Normalize(test_mean, test_std),
])

trainset = Cifar10(train_features, train_labels, transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size = 128, shuffle = True)

valset = Cifar10(val_features, val_labels, transform_val)
valloader = torch.utils.data.DataLoader(valset, batch_size = 128, shuffle = True)

testset = Cifar10(test_features, test_labels, transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size = 128, shuffle = True)

# **Optimizing only final layer of the network (custom implementation) with Hinge loss**

Training

In [0]:
net = ResNet50()
for param in net.parameters():
    param.requires_grad = False

num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.MultiMarginLoss()
optimizer_ft = optim.SGD(net.fc.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 5 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# **Optimizing only final layer of the network (custom implementation) with Cross Entropy loss**

Training

In [0]:
net = ResNet50()
for param in net.parameters():
    param.requires_grad = False

num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(net.fc.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 5 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# **Optimizing network's (custom implementation) all layers with Hinge loss**

Training

In [0]:
net = ResNet50()
num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.MultiMarginLoss()
optimizer_ft = optim.SGD(net.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 5 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# **Optimizing network's (custom implementation) all layers with Cross Entropy loss**

Training

In [0]:
net = ResNet50()
num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10)
net = net.to(device) 

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(net.parameters(), lr = 0.01, momentum = 0.9)

# Decay LR by a factor of 0.1 every 5 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 5, gamma = 0.1)

model_ft = train_model(net, criterion, optimizer_ft, exp_lr_scheduler, 10)

Testing

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        labels = labels.long()
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_ft(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))