In [None]:
import torch
import torch.nn as nn 
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from tqdm.auto import tqdm
from torch import optim
import time
import os
import shutil

## Overview
This notebook contains the implementation of original VGG16 from scratch and then retrained on cifar-10 dataset for few epochs and thereafter inferencing of model.

## Defining the Network Parameters 

![vgg](https://www.researchgate.net/publication/327070011/figure/fig1/AS:660549306159105@1534498635256/VGG-16-neural-network-architecture.png)

In [None]:
KERNEL = 3
STRIDE = 2
CHANNEL = [3,64, 64, 128, 128, 256, 256, 512, 512, 512, 512, 512, 512]
FC = [512*7*7, 4096, 4096 ]
POOL_POS = [2,4,6,9,12]  #after 2nd layer...

## Building the Network

In [None]:
class my_relu(nn.Module):
    def __init(self):
        super().__init__()
    
    def forward(x):
        return max(x,0)

In [None]:
import torch
import torch.nn as nn

KERNEL = 3
STRIDE = 2
CHANNEL = [3, 64, 64, 128, 128, 256, 256, 512, 512, 512, 512, 512, 512]
FC = [512*7*7, 4096, 4096]
POOL_POS = [2, 4, 6, 9, 12]  # after 2nd layer...

class custom_VGG16(nn.Module):
    
    def __init__(self, num_of_classes, CHANNEL=CHANNEL, FC=FC, KERNEL=KERNEL, STRIDE=STRIDE, POOL_POS=POOL_POS):
        super().__init__()
        
        self.layers = nn.ModuleList()
        self.fc = nn.ModuleList()
        self.flatten = nn.Flatten()
        self.classifier = nn.Linear(4096, num_of_classes)
        
        for i in range(1, len(CHANNEL)):
            # conv 2d layers
            self.layers.append(nn.Conv2d(in_channels=CHANNEL[i-1], out_channels=CHANNEL[i], kernel_size=KERNEL, padding='same'))
            
            # activation layer
            self.layers.append(my_relu())
            
            # Max pool
            if i in POOL_POS:
                self.layers.append(nn.MaxPool2d(kernel_size=2, stride=STRIDE))
                
        # Fully connected Layers              
        for i in range(len(FC)-1):
            self.fc.append(nn.Linear(FC[i], FC[i+1]))
            self.fc.append(my_relu())
            
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        
        x = self.flatten(x)
    
        for layer in self.fc:
            x = layer(x)
        
        # classifier 
        x = self.classifier(x)
        return x


## Weight Initialization 
1. Intialize randomly from some distribution.
2. Xavier or Kaiming initialization methods.


In [None]:
model = VGG16(num_of_classes = 10)

def initialize(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        with torch.no_grad():
            nn.init.kaiming_uniform_(m.weight)
            nn.init.zeros_(m.bias)

model = model.apply(initialize)

In [None]:
!pip install torchsummary
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Using: ', device)
from torchsummary import summary
summary(model.to(device), (3,224,224))

## Dataset Preperation

I will be using CIFAR10

In [None]:
data_transforms = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

class_names = {
    0: 'airplane',
    1: 'automobile',
    2: 'bird',
    3: 'cat',
    4: 'deer',
    5: 'dog',
    6: 'frog',
    7: 'horse',
    8: 'ship',
    9: 'truck'
}


In [None]:
train_set = torchvision.datasets.CIFAR10(root = '/', download = True, 
                                          train = True, transform = data_transforms)


test_set =  torchvision.datasets.CIFAR10(root = '/', download = True, 
                                          train = False, transform = data_transforms)

train_set = torch.utils.data.Subset(train_set, range(5000))
test_set = torch.utils.data.Subset(train_set, range(1000)) 

print('The size of dataset is :', len(train_set), len(test_set))


train_loader =  torch.utils.data.DataLoader(train_set, batch_size = 32,  shuffle = True)
test_loader = torch.utils.data.DataLoader(test_set, shuffle = False, batch_size = 32)

#Making things easy, so storing it in a dictionary 
dataloader = {
    'train': train_loader,
    'val': test_loader
}

dataset_sizes = {
    "train": len(train_set),
    "val": len(test_set)
}

## Training the Model

In [None]:
def train(model, optimizer, scheduler , criterion, num_of_epochs = 20, dataloader = dataloader):
    since = time.time()

    #storing epoch data
    epoch_data =     {
        'epoch': [],
        'train': {'loss': [], 'acc': []},
        'val': {'loss': [], 'acc': [] }
    }
    
    
    # Create a temporary directory in Kaggle's temp directory
    tempdir = '/kaggle/working/temp'
    os.makedirs(tempdir, exist_ok=True)
    best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')

    torch.save(model.state_dict(), best_model_params_path)
    best_acc = 0.0

    for epoch in range(num_of_epochs):
        print(f'Epoch {epoch+1}/{num_of_epochs}')
        print('-' * 10)
        epoch_data['epoch'].append(epoch+1)
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
            
            running_loss = 0.0
            running_corrects = 0

            

            for inputs, labels in tqdm(dataloader[phase], leave=False):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)   #normalize for batch(applicable if different batch sizes)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            epoch_data[phase]['loss'].append(epoch_loss)
            epoch_data[phase]['acc'].append(epoch_acc)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                torch.save(model.state_dict(), best_model_params_path)

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    model.load_state_dict(torch.load(best_model_params_path))

    # Clean up the temporary directory
    shutil.rmtree(tempdir)

    return model, epoch_data
                

In [None]:
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(model.parameters(), lr= 0.001)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)

In [None]:
model, epoch_data = train(model, optimizer_ft, exp_lr_scheduler, criterion, num_of_epochs = 15)

In [None]:
torch.save(model, 'model.pth')

## Transferring weights:

In [None]:
# vggmodel = torchvision.models.vgg16(weights = True)
# conv_wts = vggmodel.features.state_dict()

# for i in range(len(model.layers)):
#   if type(model.layers[i]) == nn.Conv2d and type( type(vggmodel.features[i]))==nn.Conv2d:
#     with torch.no_grad():
#       model.layers[i].weight = vggmodel.features[i].weight
#       model.layers[i].bias = vggmodel.features[i].bias


# print('Weights Copied')

In [None]:
from PIL import Image
import requests
img = Image.open(requests.get('https://images.pexels.com/photos/46148/aircraft-jet-landing-cloud-46148.jpeg', stream= True).raw)
img

In [None]:
img = data_transforms(img)


Therby the VGG16 was implemented from scratch and was trained on Cifar10 dataset.