In [None]:
import PIL
import time
import math
import copy
import torch
import random
import glob as gb
import torchvision
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
# import resnet50_128_redesign as model
import resnet50_128B8 as model
from PIL import Image
from tqdm import tqdm,trange
from torchsummary import summary
from torch.optim import lr_scheduler
from sklearn.metrics import confusion_matrix
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils import clip_grad_norm_, clip_grad_value_
from torchvision import transforms, utils
from sklearn.metrics import accuracy_score
from DataProcessing import process_dataloder
# Ignore warnings
import warnings
warnings.filterwarnings("ignore")
plt.ion()   # interactive mode

In [None]:
dataloaders, class_names, dataset_sizes, expA = process_dataloder()
print("After augmentation")
print(dataset_sizes)
print(class_names)
print(expA) 

In [None]:
def cyclical_lr(stepsize, min_lr, max_lr):

    # Scaler: we can adapt this if we do not want the triangular CLR
    scaler = lambda x: 1.

    # Lambda function to calculate the LR
    lr_lambda = lambda it: min_lr + (max_lr - min_lr) * relative(it, stepsize)

    # Additional function to see where on the cycle we are
    def relative(it, stepsize):
        cycle = math.floor(1 + it / (2 * stepsize))
        x = abs(it / stepsize - 2 * cycle + 1)
        return max(0, (1 - x)) * scaler(cycle)

    return lr_lambda

In [None]:
def training(phase, model, criterion, optimizer):
    running_loss = 0.0
    running_corrects = 0
    nb_classes = 4
    confusion_matrix = torch.zeros(nb_classes, nb_classes)

    # Iterate over data.        
    for inputs, labels in dataloaders[phase]:            
        inputs = inputs.to(device)            
        labels = labels.to(device)
        
        # zero the parameter gradients
        optimizer.zero_grad()
        
        # forward
        # track history if only in train
        with torch.set_grad_enabled(phase == 'train'):
            
            outputs = model(inputs)            
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)    
            
            # backward + optimize only if in training phase
            if phase == 'train':
                loss.backward()
                clip_grad_norm_(model.parameters(), 5)
                optimizer.step()  
            if phase == 'test':              
                with torch.no_grad():
                    for t, p in zip(labels.view(-1), preds.view(-1)):
                        confusion_matrix[t.long(), p.long()] += 1

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data) 
        
    if phase == 'test':
        print(confusion_matrix)

    return running_loss, running_corrects     

In [None]:
def epoch_mode(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()
    logs = []
    Acc = {'train':[],'test':[]}
    Los = {'train':[],'test':[]}
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):        
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode
            running_loss, running_corrects = training(phase, model, criterion, optimizer)
            if phase == 'train':
                scheduler.step()
                lr_sched_test = scheduler.get_lr()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
            phase, epoch_loss, epoch_acc))
            
            # deep copy the model
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())      
            Acc[phase].append(epoch_acc)
            Los[phase].append(epoch_loss)
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best test Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, Acc, Los

In [None]:
#download the pre-trained model
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# modelRes50 = model.resnet50_128(weights_path='./model/resnet50_128.pth')
# modelRes50.add_module("feat_extract1",nn.Conv2d(128, 64, kernel_size=[1, 1], stride=(1, 1), bias=False))
# modelRes50.add_module("feat_extract2",nn.Conv2d(64, 4, kernel_size=[1, 1], stride=(1, 1), bias=False))
# modelRes50.add_layers([modelRes50.feat_extract1, modelRes50.feat_extract2])
# modelRes50 = modelRes50.to(device)
# features_layers = 91

In [None]:
#Freeze layers before classifier
def freezing(model, features_layers):
    lay_mark = 0;
    para_list = []
    for param in modelRes50.parameters():
        if lay_mark > features_layers:
            para_list.append(param)
        if lay_mark <= features_layers:
            param.requires_grad = False
        lay_mark += 1
    return model, para_list

In [None]:
#download the pre-trained model
features_layers = 91
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
modelRes50 = model_scr.resnet50_128(weights_path='./model/resnet50_128.pth')
modelRes50.add_module("Add_pool",nn.AvgPool2d(kernel_size=[14, 14], stride=[1, 1], padding=0))
modelRes50.add_module("feat_extract0",nn.Conv2d(1024, 4, kernel_size=[1, 1], stride=(1, 1), bias=False))
modelRes50.add_layers([modelRes50.Add_pool, modelRes50.feat_extract0])
modelRes50, para_list = freezing(modelRes50, features_layers)
modelRes50 = modelRes50.to(device)

In [None]:
#final layer are being optimized 
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(para_list, lr=0.00035, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
modelRes50, Acc, Los = epoch_mode(modelRes50, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=5)