## Transfer learning for classification
### Data processing

In [72]:
import os
import pandas as pd

def create_csv(root='./data/', out_name='labels.csv'):
    subfolders = [f.path for f in os.scandir(root) if f.is_dir()] #get the path of the subfolders in the data root
    df = pd.DataFrame(columns=['file_path', 'label']) #create empty dataframe with file_path and label columns
    for i, path in enumerate(subfolders):
        files = [f.path for f in os.scandir(path) if f.is_file()]
        for f in files:
            df = df.append({'file_path':f, 'label':i}, ignore_index=True) #add each image as a row to the dataframe
    df.to_csv(root+out_name, index=False) #save the dataframe to a csv file

In [73]:
create_csv()

In [74]:
import numpy as np
from PIL import Image
import torch

class ClassificationDataset():
    def __init__(self, csv='./data/labels.csv', transform=None):
        self.csv = pd.read_csv(csv) #read the data csv
        self.transform = transform #save the transform variable as part of the class object

    def __len__(self):
        return len(self.csv)

    def __getitem__(self, idx):
        filepath, label = self.csv['file_path'][idx], self.csv['label'][idx] #get the image filepath and label from at that index from the csv
        img = Image.open(filepath).convert("RGB") #open with PIL and convert to rgb
        if self.transform:
            img, label = self.transform((img, label)) #apply transforms
        return img, label

class SquareCrop():
    """Adjust aspect ratio of image to make it square and crop it to given size"""
    def __init__(self, output_size):
        assert isinstance(output_size, (int)) # assert output_size is integer
        self.output_size = output_size

    def __call__(self, sample):        
        image, label = sample
        h, w = image.size
        if h>w:
            new_w = self.output_size
            scale = new_w/w
            new_h = scale*h
        elif w>h:
            new_h = self.output_size
            scale = new_h/h
            new_w = scale*w
        else:
            new_h, new_w = self.output_size, self.output_size
        new_h, new_w = int(new_h), int(new_w) # account for non-integer computed dimensions (rounds to nearest int)
        image = image.resize((new_h, new_w))
        crop_start_w=np.random.randint((new_w-self.output_size)+1)
        crop_start_h=np.random.randint((new_h-self.output_size)+1)
        image = image.crop((crop_start_h, crop_start_w, crop_start_h+self.output_size, crop_start_w+self.output_size))
        return image, label

class ImageToTensor():
    def __init__(self):
        pass
    def __call__(self, sample):
        image, label = sample
        image = np.array(image)/255 #convert to numpy array and normalise between 0-1
        image = image.transpose((2, 0, 1)) #swap channel dimension
        return torch.Tensor(image), label

In [76]:
from torchvision import transforms
from torch.utils.data import DataLoader

create_csv()

classnames = [f.name for f in os.scandir('./data/') if f.is_dir()] #get the class names from the folders
classname_to_id = dict(zip(classnames, range(len(classnames)))) #create the mapping from classname to class id
id_to_classname = dict(zip(classname_to_id.values(), classname_to_id.keys())) # create the reverse mapping from class id to classname
n_classes = len(classnames)
print(id_to_classname)

img_crop_size = 224
train_split = 0.8 # percentage that will be training set
val_split = 0.1 #percentage that will be validation set
batch_size = 16

mytransforms = []
mytransforms.append(SquareCrop(img_crop_size)) #add square crop transform
mytransforms.append(ImageToTensor()) #add to tensor transform
mytransforms = transforms.Compose(mytransforms)

mydataset = ClassificationDataset(csv='./data/labels.csv', transform=mytransforms)

data_size=len(mydataset)
train_size = int(train_split * data_size)
val_size = int(val_split * data_size)
test_size = data_size - (val_size + train_size)
train_data, val_data, test_data = torch.utils.data.random_split(mydataset, [train_size, val_size, test_size])

train_samples = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_samples = DataLoader(val_data, batch_size=batch_size)
test_samples = DataLoader(test_data, batch_size=batch_size)

{0: 'hotdog', 1: 'dog'}


### Loading in pretrained model

In [77]:
from torchvision import models
import torch.nn.functional as F

class VGGClassifier(torch.nn.Module):
    def __init__(self, out_size):
        super().__init__()
        self.features = models.vgg11(pretrained=True).features #get the convolutional layers of vgg11. output size is 512x7x7
        self.regressor = torch.nn.Sequential(
            torch.nn.Linear(512*7*7, 4096),
            torch.nn.ReLU(),
            torch.nn.Dropout(),
            torch.nn.Linear(4096, 1024),
            torch.nn.ReLU(),
            torch.nn.Linear(1024, out_size),
            torch.nn.Softmax(dim=1)
            )

    def forward(self, x):
        x = F.relu(self.features(x)).reshape(-1, 512*7*7)
        x = self.regressor(x)
        return x

    def freeze(self):
        for param in self.features.parameters():
            param.requires_grad=False

    def unfreeze(self):
        for param in self.features.parameters():
            param.requires_grad=True

### Training

In [78]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

lr = [3e-4, 3e-5] #differential learning rate. #lr[0] is main lr #lr[1] is lr of early layers
weight_decay = 0#1e-4
train_split = 0.8
val_split = 0.9

mymodel = VGGClassifier(out_size=n_classes).to(device)

optimizer = torch.optim.Adam([{'params': mymodel.regressor.parameters()},
                              {'params': mymodel.features.parameters(), 'lr': lr[1]}],
                              lr=lr[0], weight_decay=weight_decay)

In [81]:
import matplotlib.pyplot as plt

def train(epochs):
    plt.close()
    mymodel.train()
    
    bcosts = []
    ecosts = []
    valcosts = []
    plt.ion()
    fig = plt.figure(figsize=(10, 5))
    ax = fig.add_subplot(121)
    ax2 = fig.add_subplot(122)
    
    plt.show()
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Cost')

    ax2.axis('off')
    img_label_text = ax2.text(0, -5, '', fontsize=15)
    
    for e in range(epochs):
        ecost=0
        valcost=0
        for i, (x, y) in enumerate(train_samples):
            x, y = x.to(device), y.to(device)

            h = mymodel.forward(x) #calculate hypothesis
            cost = F.cross_entropy(h, y, reduction='sum') #calculate cost
            
            optimizer.zero_grad() #zero gradients
            cost.backward() # calculate derivatives of values of filters
            optimizer.step() #update parameters

            bcosts.append(cost.item()/batch_size)
            
            y_ind=0
            im = np.array(x[y_ind]).transpose(1, 2, 0)
            predicted_class = id_to_classname[h.max(1)[1][y_ind].item()]
            ax2.imshow(im)
            img_label_text.set_text('Predicted class: '+ predicted_class)
            
            fig.canvas.draw()
            ecost+=cost.item()
        for i, (x, y) in enumerate(val_samples):
            x, y = x.to(device), y.to(device)
            h = mymodel.forward(x) #calculate hypothesis
            cost = F.cross_entropy(h, y, reduction='sum') #calculate cost
            y_ind=0
            im = np.array(x[y_ind]).transpose(1, 2, 0)
            predicted_class = id_to_classname[h.max(1)[1][y_ind].item()]
            ax2.imshow(im)
            img_label_text.set_text('Predicted class: '+ predicted_class)
            fig.canvas.draw()
            valcost+=cost.item()
        ecost/=train_size
        valcost/=val_size
        ecosts.append(ecost)
        valcosts.append(valcost)
        ax.plot(ecosts, 'b', label='Train cost')
        ax.plot(valcosts, 'r', label='Validation cost')
        if e==0: ax.legend()
        fig.canvas.draw()

        print('Epoch', e, '\tCost', ecost)

In [None]:
%matplotlib notebook
mymodel.freeze()
train(20)
#mymodel.unfreeze()
#train(5)

### Testing

In [83]:
def test():
    print('Started evaluation...')
    mymodel.eval() #put model into evaluation mode    
    #calculate the accuracy of our model over the whole test set in batches
    correct = 0
    for x, y in test_samples:
        x, y = x.to(device), y.to(device)
        h = mymodel.forward(x)
        pred = h.data.max(1)[1]
        correct += pred.eq(y).sum().item()
    return round(correct/len(test_data), 4)

In [84]:
acc = test()
print('Test accuracy: ', acc)

Started evaluation...
Test accuracy:  0.9726
