In [1]:
# link colab and drive
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [2]:
from torch.utils.data import Dataset, Sampler, DataLoader, SubsetRandomSampler
import random
import torch
from torch import nn, optim
import torchvision
from torchvision import transforms
from torchvision.utils import save_image
from PIL import Image
import numpy as np
import torch.nn.functional as F
import copy
import matplotlib
from matplotlib.pyplot import *
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.utils import resample
#torch.manual_seed(11)

##Functions

###Utils

In [3]:
class customDataset(Dataset):
  def __init__(self, rootPath, resize_transformation, data_augmentation_transformation=None, augment=True):
    ''' 
      @rootPath: path of the folder containing class subfolders
      @transformation: transformation to be applied to each image
      @data_augmentation: transformations to be applied to train images
      @augment: boolean indicating if data augmentation should be performed
    '''
    self.data = torchvision.datasets.ImageFolder(rootPath)
    self.transformation = resize_transformation
    self.data_augmentation = data_augmentation_transformation
    self.augment = True
    
  def __getitem__(self, key):
    
    true_class = self.data[key][1]
    im = Image.open(self.data.imgs[key][0])
    if self.augment and self.data_augmentation!=None:
      img_tensor = self.data_augmentation(im)
    else:
      img_tensor = self.transformation(im)

    return img_tensor, true_class

  def __len__(self):
    return len(self.data)

  def set_augment(self, value):
    self.augment = value

  def change_transformation(self, new_transformation):
    self.transformation = new_transformation

  def change_augmentation(self, new_augmentation):
    self.data_augmentation = new_augmentation

  def get_keys(self):
    return range(len(self.data))

In [4]:
def split(dataset, val_size):
    '''
    @ dataset: a customDataset object
    @ val_size: % of validation data
    '''
    index = list(dataset.get_keys())
    val_per_class = int(val_size*100)
    validation_index = []
    for i in range(15):
        idx = random.sample(range(100*i,100*(i+1)),val_per_class)
        validation_index= validation_index + idx
    train_index = list(set(index)-set(validation_index))
    bootstrap_index = resample(train_index, replace = True)
  
    train_sampler = SubsetRandomSampler(bootstrap_index)
    validation_sampler = SubsetRandomSampler(validation_index)

    return train_sampler, validation_sampler

In [5]:
def loaders(dataset, val_size, batch_size, num_workers):
  ''' 
    @dataset: a customDataset object
    @val_size: % validation data
    @batch_size: the number of examples in each batch
    @num_workers: number of subprocesses to use in the data loader
  '''

  train_sampler, validation_sampler = split(dataset, val_size)
  train_loader = DataLoader(dataset,
                            batch_size = batch_size,
                            sampler = train_sampler,
                            num_workers = num_workers)
  val_loader = DataLoader(dataset,
                          #batch_size = batch_size,
                          batch_size = int(val_size*len(dataset)),
                          sampler = validation_sampler,
                          num_workers = num_workers)
  return train_loader, val_loader

###CNN

In [6]:
def training_with_scheduler(net, trainLoader,valLoader,lr, optimizer, scheduler, criterion, val_patience, validate_each):

  # here I save loss and accuracy
  train_loss = []
  val_loss = []
  train_accuracy = []
  val_accuracy = []

  # early stopping
  best_net = copy.deepcopy(net.state_dict())
  best_loss = 100.0
  best_acc = 0
  worsening_count = 0
  
  #net.reset
  net.train()
  n_batches = len(trainLoader)

  for e in range(epochs):
      correct_classified = 0
      for i, data in enumerate(trainLoader):
    
        batch = data[0].to(device)
        batch = batch.float()
        labels = data[1].to(device)    

        optimizer.zero_grad() 
        outputs = net(batch)
        
        loss = criterion(outputs, labels) 
        train_loss.append(loss.item())
        predicted_class = torch.argmax(outputs, dim=1)
        correct_classified = correct_classified + sum((predicted_class==labels).int())
        acc = sum((predicted_class==labels).int())/batch.shape[0]
        train_accuracy.append(acc)

        loss.backward()
        optimizer.step()
        
        # parameter to decide how often to validate
        if i % validate_each ==0:
          with torch.no_grad():
            trainingSet.set_augment(False)
            valLoss, valAcc = validate(valLoader,net,criterion)
            # save validation loss and accuracy
            val_loss.append(valLoss)
            val_accuracy.append(valAcc)
            trainingSet.set_augment(True)
            
          # if validation loss increase (at least +1%), increase the counter
          if valLoss>best_loss:
              worsening_count = worsening_count+1
              # if I exceed the patience, early stop
              if worsening_count > val_patience:
                return [best_net, train_loss, val_loss, train_accuracy, val_accuracy]
          # else reset the counter and use actual validation loss as reference, save the net
          else:
             worsening_count = 0
             best_loss = valLoss
             best_acc = valAcc
             best_net = copy.deepcopy(net.state_dict())

          print("[LR]: {:.4f}\n".format(scheduler.get_last_lr()[0]))
          print("[EPOCH]: {}, [BATCH]: {}/{}, [LOSS]: t {}, v {},\t [ACC.]: t {},\t v {}".format(e, i, n_batches, loss.item(), valLoss, acc, valAcc))
      scheduler.step()
  return [best_net, train_loss, val_loss, train_accuracy, val_accuracy]

In [7]:
# validation 
def validate(valLoader, net, criterion):
  correct_count=0
  size = 0
  for i, data in enumerate(valLoader):
    batch = data[0].to(device)
    batch = batch.float()
    labels = data[1].to(device)    

    outputs = net(batch)
    loss = criterion(outputs, labels) 
    predicted_class = torch.argmax(outputs, dim=1)
    correct_count = correct_count + sum((predicted_class==labels).int())
    size = size + batch.shape[0]
  acc = correct_count/size
  return [loss.item(), acc]

In [8]:
def plot_results(train_data, validation_data, color='C0', isAccuracy = False):
    fig = figure(figsize=(20,10))
    xlim(0,len(train_data)+len(trainLoader))
    
    label1 = 'training_{}'.format('accuracy' if isAccuracy else 'loss')
    label2 = 'validation_{}'.format('accuracy' if isAccuracy else 'loss')
    
    for i in range(0,epochs):
        if (i%2 != 0):
            axvspan(i*len(trainLoader), i*len(trainLoader)+len(trainLoader), facecolor='silver', alpha=0.5)
    ax = subplot(1,1,1)
    l1 = ax.plot(range(0,len(train_data)), train_data, color, label=label1)

    l2 = ax.plot(range(0,len(train_loss), 5), validation_data, 'black', linestyle ='--', dashes=(5, 5),  marker='o',label=label2)
    legend(fontsize=18)
    axvline((len(validation_data)-(patience+1))*validate_each, color='red', linestyle = '--')

In [9]:
resize = transforms.Compose([
                                        transforms.Resize([64,64], interpolation=Image.BILINEAR),
                                        transforms.ToTensor()
])

def resize_transformation(img):
  i = resize(img)
  i =i*255
  return i

In [10]:
flip = transforms.Compose([
                                          transforms.Resize([64,64], interpolation=Image.BILINEAR),
                                          transforms.RandomHorizontalFlip(p=0.5),
                                          transforms.ToTensor()
])

def flip_transformation(img):
  i = flip(img)
  i =i*255
  return i

In [11]:
# test
def testing_ensemble(net, testLoader):
  net.eval()

  with torch.no_grad():
    # to save true and predicted classes
    probabilities = []
    true_class = []
    for i, data in enumerate(testLoader):
      image = data[0].to(device)
      image = image.float()
      label = data[1].to(device)
      true_class.append(label)

      output = net(image)
      sm = torch.nn.Softmax(0)
      prob = sm(output) 
      probabilities.append(output)
      
  true_class = torch.cat(true_class)
  probabilities = torch.cat(probabilities)
  return [true_class, probabilities]

##Imprt Data

In [12]:
# if available use GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('Device: {}'.format(device))

Device: cpu


In [13]:
trainPath = '/content/drive/My Drive/CV_project/Images/train'
testPath =  '/content/drive/My Drive/CV_project/Images/test'
dataTrain = torchvision.datasets.ImageFolder(trainPath)
dataTest = torchvision.datasets.ImageFolder(testPath)

In [14]:
# datasets
batch_size = 32
num_workers = 1
trainingSet = customDataset(trainPath, resize_transformation, flip_transformation)
trainLoader, valLoader = loaders(trainingSet, 0.15, batch_size, num_workers)
testSet = customDataset(testPath, resize_transformation, augment=False)
testLoader = DataLoader(testSet, batch_size = batch_size, shuffle=False, num_workers=num_workers)

In [15]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.input_dim = 1 * 64 * 64
        self.n_classes = 15
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, stride=1, padding=1, padding_mode='replicate')  
        torch.nn.init.normal_(self.conv1.weight, mean=0.0, std=0.01)
        torch.nn.init.zeros_(self.conv1.bias)     

        self.BN1 = nn.BatchNorm2d(num_features=8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

        self.maxpooling = nn.MaxPool2d(kernel_size=2,stride=2)  

        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=5, stride=1, padding=2, padding_mode='replicate')
        torch.nn.init.normal_(self.conv2.weight, mean=0.0, std=0.01)
        torch.nn.init.zeros_(self.conv2.bias)

        self.BN2 = nn.BatchNorm2d(num_features=16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)


        self.conv3 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=7, stride=1, padding=3, padding_mode='replicate')
        torch.nn.init.normal_(self.conv3.weight, mean=0.0, std=0.01)
        torch.nn.init.zeros_(self.conv3.bias)

        self.BN3 = nn.BatchNorm2d(num_features=32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

        self.fc1 = nn.Linear(16*16*32,15) # 12*12*32 no padding (default), if padding 16*16*32
        torch.nn.init.normal_(self.fc1.weight, mean=0.0, std=0.01)
        torch.nn.init.zeros_(self.fc1.bias)

        self.dropout = nn.Dropout(0.25)


    def forward(self, x, verbose=False):
      x = self.conv1(x)
      x = self.BN1(x)
      x = F.relu(x)
      x = self.maxpooling(x) 
      x = self.conv2(x)
      x = self.BN2(x)
      x = F.relu(x)
      x = self.maxpooling(x)
      x = self.conv3(x)
      x = self.BN3(x)
      x = F.relu(x)
      x = self.dropout(x)
      x = x.view(x.size(0), -1)
      x = self.fc1(x)
      # no softmax because crossentropy loss performs it
      return x

In [16]:
# parameters
lr = 0.001
epochs = 20
criterion = nn.CrossEntropyLoss()

patience = 10
validate_each = 5

In [17]:
nets = 5
ensemble = [0]*nets

for i in range(nets):
  ensemble[i] = CNN()
  ensemble[i].to(device)
  optimizer = optim.Adam(ensemble[i].parameters(), lr = lr, weight_decay=0.2)
  lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.7)
  trained_net, train_loss, valid_loss, train_acc, valid_acc = training_with_scheduler(ensemble[i], trainLoader, valLoader, lr, optimizer, lr_scheduler, criterion, patience, validate_each)
  ensemble[i].load_state_dict(trained_net)
  print("Learned model {}".format(i))

[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 0/40, [LOSS]: t 2.9200611114501953, v 3.4833221435546875,	 [ACC.]: t 0.0625,	 v 0.09333333373069763
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 5/40, [LOSS]: t 2.9943370819091797, v 2.7118396759033203,	 [ACC.]: t 0.09375,	 v 0.18666666746139526
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 10/40, [LOSS]: t 2.1466453075408936, v 2.5397026538848877,	 [ACC.]: t 0.34375,	 v 0.24888889491558075
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 15/40, [LOSS]: t 2.3250789642333984, v 2.234930992126465,	 [ACC.]: t 0.3125,	 v 0.31555554270744324
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 20/40, [LOSS]: t 2.4173572063446045, v 2.4247047901153564,	 [ACC.]: t 0.34375,	 v 0.29777777194976807
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 25/40, [LOSS]: t 1.5490407943725586, v 2.451612710952759,	 [ACC.]: t 0.5,	 v 0.2933333218097687
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 30/40, [LOSS]: t 1.6575456857681274, v 2.26464581489563,	 [ACC.]: t 0.5625,	 v 0.3466666638851166
[LR]: 0.0010

[EPOCH]: 0, [BATCH]: 35/40, [LOSS]: t 1.34

In [18]:
results= [0]*nets
true = []
for i in range(nets):
  true, results[i] = testing_ensemble(ensemble[i], testLoader)
  print('Done predictions for netwok ', i)

Done predictions for netwok  0
Done predictions for netwok  1
Done predictions for netwok  2
Done predictions for netwok  3
Done predictions for netwok  4


In [19]:
classes = torch.argmax(torch.mean(torch.stack(results), dim=0), dim=1)

In [20]:
accuracy_score(true, classes)

0.6006700167504188