Víctor Cereijo Herranz

César Caramazana Zarzosa

13/05/2023

In [None]:
%matplotlib inline

## Computer Vision
## Master in Information Health Engineering

# LAB SESSION 6: IMAGE CLASSIFICATION WITH CNNs

# AUTOMATIC DIAGNOSTIC SYSTEM OF SKIN LESSIONS FROM DERMOSCOPIC IMAGES


### Iván González Díaz


<center><img src='http://www.tsc.uc3m.es/~igonzalez/images/logo_uc3m_foot.jpg' width=400 /></center>

# Imports

In [None]:
from __future__ import print_function, division
import os
import torch
import pandas as pd
from skimage import io, transform, util
from sklearn import metrics
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, models
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import time
import copy
from PIL import Image
import pdb
import random
import numpy.random as npr

random.seed(42)
npr.seed(42)
torch.manual_seed(42)
torch.backends.cudnn.enabled = False

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

plt.ion()   # interactive mode
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [None]:
#ONLY TO USE GOOGLE COLAB. Run this code only the first time you run this notebook and then comment these lines

from shutil import copyfile
from google.colab import drive
import os, sys
drive.mount('/content/drive', force_remount=True)
copyfile('/content/drive/My Drive/MASTER/Computer Vision/Lab6/db1.zip', './db1.zip') #Copy db files to our working folder
copyfile('/content/drive/My Drive/MASTER/Computer Vision/Lab6/db2.zip', './db2.zip')

path_to_folder = '/content/drive/My Drive/MASTER/Computer Vision/Lab6'  
os.chdir(path_to_folder) 


Mounted at /content/drive


In [None]:
#NOTE: Run this only once, in the machine where you want to run your code, then comment these lines
"""
import zipfile
zipPath='./db1.zip' #path of the 1st zip file
dataFolder='./data' #We extract files to the current folder
with zipfile.ZipFile(zipPath, 'r') as zip_ref:
    zip_ref.extractall(dataFolder)
    
zipPath='./db2.zip' #path of the 2nd zip file
dataFolder='./data' # We extract files to the current folder
with zipfile.ZipFile(zipPath, 'r') as zip_ref:
    zip_ref.extractall(dataFolder)
"""    

"\nimport zipfile\nzipPath='./db1.zip' #path of the 1st zip file\ndataFolder='./data' #We extract files to the current folder\nwith zipfile.ZipFile(zipPath, 'r') as zip_ref:\n    zip_ref.extractall(dataFolder)\n    \nzipPath='./db2.zip' #path of the 2nd zip file\ndataFolder='./data' # We extract files to the current folder\nwith zipfile.ZipFile(zipPath, 'r') as zip_ref:\n    zip_ref.extractall(dataFolder)\n"

#Class Dataset





In [None]:
class DermoscopyDataset(Dataset):
    """Dermoscopy dataset."""

    def __init__(self, csv_file, root_dir,transform=None,  maxSize=0):
        """
        Args:
            csv_file (string): Path al fichero csv con las anotaciones.
            root_dir (string): Directorio raíz donde encontraremos las carpetas 'images' y 'masks' .
            transform (callable, optional): Transformaciones opcionales a realizar sobre las imágenes.
        """
        self.dataset = pd.read_csv(csv_file,header=0,dtype={'id': str, 'label': int})
        
        if maxSize>0:
            newDatasetSize=maxSize #maxSize muestras
            idx=np.random.RandomState(seed=42).permutation(range(len(self.dataset)))
            reduced_dataset=self.dataset.iloc[idx[0:newDatasetSize]]
            self.dataset=reduced_dataset.reset_index(drop=True)

        self.root_dir = root_dir
        self.img_dir = os.path.join(root_dir,'images') 
        self.mask_dir = os.path.join(root_dir,'masks')
        self.transform = transform
        self.classes = ['nevus', 'melanoma', 'keratosis']

        
    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        #Leemos la imagen
        img_name = os.path.join(self.img_dir,self.dataset.id[idx] + '.jpg')
        image = io.imread(img_name)
        #Leemos la máscara
        mask_name = os.path.join(self.mask_dir,self.dataset.id[idx] + '.png')
        mask = io.imread(mask_name)
        
        sample = {'image': image, 'mask': mask, 'label':  self.dataset.label[idx].astype(dtype=np.long)}
        if self.transform:
            sample = self.transform(sample)
        return sample

# Transformations

In [None]:
from skimage.util import random_noise

class CropByMask(object):
    """Crop the image using the lesion mask.

    Args:
        border (tuple or int): Border surrounding the mask. We dilate the mask as the skin surrounding 
        the lesion is important for dermatologists.
        If it is a tuple, then it is (bordery,borderx)
    """

    def __init__(self, border):
        assert isinstance(border, (int, tuple))
        if isinstance(border, int):
            self.border = (border,border)
        else:
            self.border = border
            
    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']
        h, w = image.shape[:2]
        #Compute the coordinates of the bounding box that contains the mask 
        sidx=np.nonzero(mask)
        minx=np.maximum(sidx[1].min()-self.border[1],0)
        maxx=np.minimum(sidx[1].max()+1+self.border[1],w)
        miny=np.maximum(sidx[0].min()-self.border[0],0)
        maxy=np.minimum(sidx[0].max()+1+self.border[1],h)
        #Crop the image
        image=image[miny:maxy,minx:maxx,...]
        mask=mask[miny:maxy,minx:maxx]

        return {'image': image, 'mask': mask, 'label' : label}
    
class Rescale(object):
    """Re-scale image to a predefined size.

    Args:
        output_size (tuple or int): The desired size. If it is a tuple, output is the output_size. 
        If it is an int, the smallest dimension will be the output_size
            a we will keep fixed the original aspect ratio.
    """

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']

        h, w = image.shape[:2]
        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size

        new_h, new_w = int(new_h), int(new_w)

        img = transform.resize(image, (new_h, new_w))
        msk = transform.resize(mask, (new_h, new_w))

        return {'image': img, 'mask': msk, 'label' : label}

class RandomCrop(object):
    """Randomly crop the image.

    Args:
        output_size (tuple or int): Crop size. If  int, square crop

    """

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        if isinstance(output_size, int):
            self.output_size = (output_size, output_size)
        else:
            assert len(output_size) == 2
            self.output_size = output_size

    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']

        h, w = image.shape[:2]
        new_h, new_w = self.output_size

        if h>new_h:
            top = np.random.randint(0, h - new_h)
        else:
            top=0
            
        if w>new_w: 
            left = np.random.randint(0, w - new_w)
        else:
            left = 0
            
        image = image[top: top + new_h,
                     left: left + new_w]

        mask = mask[top: top + new_h,
                      left: left + new_w]


        return {'image': image, 'mask': mask, 'label': label}
    
class CenterCrop(object):
    """Crop the central area of the image

    Args:
        output_size (tupla or int): Crop size. If int, square crop

    """

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        if isinstance(output_size, int):
            self.output_size = (output_size, output_size)
        else:
            assert len(output_size) == 2
            self.output_size = output_size

    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']
        h, w = image.shape[:2]
        new_h, new_w = self.output_size
        rem_h = h - new_h
        rem_w = w - new_w
        
        if h>new_h:
            top = int(rem_h/2)
        else:
            top=0
            
        if w>new_w: 
            left = int(rem_w/2)
        else:
            left = 0
            
        image = image[top: top + new_h,
                     left: left + new_w]

        mask = mask[top: top + new_h,
                      left: left + new_w]


        return {'image': image, 'mask': mask, 'label': label}


class ToTensor(object):
    """Convert ndarrays into pytorch tensors."""

    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']

        # Cambiamos los ejes
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.transpose((2, 0, 1))
        image = torch.from_numpy(image)
        # A la máscara le añadimos una dim fake al principio
        mask = torch.from_numpy(mask)
        mask = mask.unsqueeze(0)
        label=torch.tensor(label,dtype=torch.long)
        
        return {'image':image,
                'mask':mask,
                'label':label}
    
class Normalize(object):
    """Normalize data by subtracting means and dividing by standard deviations.

    Args:
        mean_vec: Vector with means. 
        std_vec: Vector with standard deviations.
    """

    def __init__(self, mean,std):
      
        assert len(mean)==len(std),'Length of mean and std vectors is not the same'
        self.mean = np.array(mean)
        self.std = np.array(std)

    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']
        c, h, w = image.shape
        assert c==len(self.mean), 'Length of mean and image is not the same' 
        dtype = image.dtype
        mean = torch.as_tensor(self.mean, dtype=dtype, device=image.device)
        std = torch.as_tensor(self.std, dtype=dtype, device=image.device)
        image.sub_(mean[:, None, None]).div_(std[:, None, None])
    
        
        return {'image': image, 'mask': mask, 'label' : label}

class TVCenterCrop(object):
    """Crop the central area of the image. Example using the method in torchvision. Requires to
    internally convert from skimage (numpy array) to PIL Image

    Args:
        output_size (tupla or int): Crop size. If int, square crop

    """

    def __init__(self, size):
        self.CC=transforms.CenterCrop(size)

    def __call__(self, sample):
        image, mask, label = sample['image'], sample['mask'],sample['label']
        pil_image=Image.fromarray(util.img_as_ubyte(image))
        pil_image=self.CC(pil_image)
        image=util.img_as_float(np.asarray(pil_image))
        
        pil_mask=Image.fromarray(util.img_as_ubyte(mask))
        pil_mask=self.CC(pil_mask)
        mask=util.img_as_float(np.asarray(pil_mask))
        
        return {'image': image, 'mask': mask, 'label': label}        

### Put your code here....

class HorizontalFlip(object):
  
  def __init__(self, p=0.5):
    self.hf = transforms.RandomHorizontalFlip(p=p)
  
  def __call__(self, sample):
    image, mask, label = sample['image'], sample['mask'], sample['label']

    #Set seed to apply the same transformation to img and mask
    seed = np.random.randint(34193483)
    random.seed(seed)
    torch.manual_seed(seed)

    pil_image=Image.fromarray(util.img_as_ubyte(image))
    pil_image=self.hf(pil_image)
    image=util.img_as_float(np.asarray(pil_image))
    
    pil_mask=Image.fromarray(util.img_as_ubyte(mask))
    pil_mask=self.hf(pil_mask)
    mask=util.img_as_float(np.asarray(pil_mask))

    return {'image': image, 'mask': mask, 'label': label}


class VerticalFlip(object):
  
  def __init__(self, p=0.5):
    self.vf = transforms.RandomVerticalFlip(p=p)
  
  def __call__(self, sample):
    image, mask, label = sample['image'], sample['mask'], sample['label']

    seed = np.random.randint(34193483)
    random.seed(seed)
    torch.manual_seed(seed)

    pil_image=Image.fromarray(util.img_as_ubyte(image))
    pil_image=self.vf(pil_image)
    image=util.img_as_float(np.asarray(pil_image))
    
    pil_mask=Image.fromarray(util.img_as_ubyte(mask))
    pil_mask=self.vf(pil_mask)
    mask=util.img_as_float(np.asarray(pil_mask))

    return {'image': image, 'mask': mask, 'label': label}


class Jitter(object):
  
  def __init__(self, brightness = 0.03, contrast=0.05, saturation=0.03, hue=0.03):
    
    self.jitter = transforms.ColorJitter(brightness=brightness, contrast=contrast, saturation=saturation, hue=hue)
  
  def __call__(self, sample):
    image, mask, label = sample['image'], sample['mask'], sample['label']

    pil_image=Image.fromarray(util.img_as_ubyte(image))
    pil_image=self.jitter(pil_image)
    image=util.img_as_float(np.asarray(pil_image))


    return {'image': image, 'mask': mask, 'label': label}

class Rotation(object):
  def __init__(self, degrees=30):
    self.rotate = transforms.RandomRotation(degrees)

  def __call__(self, sample):
    image, mask, label = sample['image'], sample['mask'], sample['label']

    seed = np.random.randint(34193483)
    random.seed(seed)
    torch.manual_seed(seed)

    pil_image=Image.fromarray(util.img_as_ubyte(image))
    pil_image=self.rotate(pil_image)
    image=util.img_as_float(np.asarray(pil_image))
    
    pil_mask=Image.fromarray(util.img_as_ubyte(mask))
    pil_mask=self.rotate(pil_mask)
    mask=util.img_as_float(np.asarray(pil_mask))

    return {'image': image, 'mask': mask, 'label': label}


class GaussianBlur(object):  
  def __init__(self, kernel=5, sigma=1):
    self.blur = transforms.GaussianBlur(kernel_size=kernel, sigma=(0.1, sigma))
  
  def __call__(self, sample):
    image, mask, label = sample['image'], sample['mask'], sample['label']

    #Set seed to apply the same transformation to img and mask
    seed = np.random.randint(34193483)
    random.seed(seed)
    torch.manual_seed(seed)

    pil_image=Image.fromarray(util.img_as_ubyte(image))
    pil_image=self.blur(pil_image)
    image=util.img_as_float(np.asarray(pil_image))
    
    return {'image': image, 'mask': mask, 'label': label}

class GaussianNoise(object):
  def __init__(self, sigma=1):
    self.sigma = sigma
    self.mean = 0

  def __call__(self, sample):
    image, mask, label = sample['image'], sample['mask'], sample['label']  

    seed = np.random.randint(34193483)
    random.seed(seed)
    torch.manual_seed(seed)

    gauss = np.random.normal(self.mean,self.sigma,image.shape)
    image = image + gauss

    return {'image': image, 'mask': mask, 'label': label}

    
        

# The dataset

## Data augmentation

In [None]:
#Pixel means and stds expected by models in torchvision
pixel_mean=[0.485, 0.456, 0.406]
pixel_std=[0.229, 0.224, 0.225]

rescale = 256
crop = 224

#Train Dataset
train_dataset = DermoscopyDataset(csv_file='data/dermoscopyDBtrain.csv',
                                    root_dir='data',
                                    maxSize=2000, ###IMPORTANT: maxSize=500 to speed-up the training process
                                    transform=transforms.Compose([
                                    CropByMask(15),
                                    Rescale(rescale),
                                    #Rotation(90),
                                    RandomCrop(crop),
                                    #Jitter(brightness=0.05, contrast=0.1, hue=0, saturation=0),
                                    HorizontalFlip(p=0.5),
                                    VerticalFlip(p=0.5), 
                                    ToTensor(),
                                    Normalize(mean=pixel_mean,
                                    std=pixel_std)
                                    ]))
#Val dataset
val_dataset = DermoscopyDataset(csv_file='data/dermoscopyDBval.csv',
                                    root_dir='data',
                                    transform=transforms.Compose([
                                    CropByMask(15),
                                    Rescale(rescale),
                                    CenterCrop(crop),
                                    ToTensor(),
                                    Normalize(mean=pixel_mean,
                                    std=pixel_std)
                                    ]))

#Test dataset
test_dataset = DermoscopyDataset(csv_file='data/dermoscopyDBtest.csv',
                                    root_dir='data',
                                    transform=transforms.Compose([
                                    CropByMask(15),
                                    Rescale(rescale),
                                    CenterCrop(crop),
                                    ToTensor(),
                                    Normalize(mean=pixel_mean,
                                    std=pixel_std)
                                    ]))
    

## Dataloaders

In [None]:
#Specify training dataset, with a batch size of 8, shuffle the samples, and parallelize with 3 workers
train_dataloader = DataLoader(train_dataset, batch_size=128,
                        shuffle=True, num_workers=3)
#Validation dataset => No shuffle
val_dataloader = DataLoader(val_dataset, batch_size=32,
                        shuffle=False, num_workers=2)

#Test Dataset => => No shuffle
test_dataloader = DataLoader(test_dataset, batch_size=32,
                        shuffle=False, num_workers=3)


In [None]:
image_datasets = {'train' : train_dataset, 'val': val_dataset}

dataloaders = {'train' : train_dataloader, 'val': val_dataloader}
          
dataset_sizes = {'train': len(train_dataset), 'val': len(val_dataset)}
class_names = image_datasets['train'].classes


# Auxiliary functions

## Compute AUCs

In [None]:
#Function that computes 2 AUCs: melanoma vs all and keratosis vs all
# scores is nx3: n is the number of samples in the dataset 
# labels is nx1
# Function resturns an array with two elements: the auc values
def computeAUCs(scores,labels):
                
    aucs = np.zeros((2,))
    #Calculamos el AUC melanoma vs all
    scores_mel = scores[:,1]
    labels_mel = (labels == 1).astype(np.int) 
    aucs[0]=metrics.roc_auc_score(labels_mel, scores_mel)

    #Calculamos el AUC queratosis vs all
    scores_sk = scores[:,2]
    labels_sk = (labels == 2).astype(np.int) 
    aucs[1]=metrics.roc_auc_score(labels_sk, scores_sk)
    
    return aucs

## Train model

In [None]:
#train_model parameters are the network (model), the criterion (loss),
# the optimizer, a learning scheduler (una estrategia de lr strategy), and the training epochs
def train_model(model, criterion, optimizer, scheduler, num_epochs=25, output_dir='./Checkpoints/', model_name='custom'):
    since = time.time()
    
    numClasses = len(image_datasets['train'].classes)
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_aucs = np.zeros((2,)) #AUCs melanoma vs all, and keratosis
    best_auc = 0
    best_epoch = -1
    
    #Loop of epochs (each iteration involves train and val datasets)
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        
        
        # Cada época tiene entrenamiento y validación
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set the model in training mode
            else:
                model.eval()   # Set the model in val mode (no grads)
            
            #Dataset size
            numSamples = dataset_sizes[phase]
            
            # Create variables to store outputs and labels
            outputs_m=np.zeros((numSamples,numClasses),dtype=np.float)
            labels_m=np.zeros((numSamples,),dtype=np.int)
            running_loss = 0.0
            
            contSamples=0
            
            # Iterate (loop of batches)
            for sample in dataloaders[phase]:
                inputs = sample['image'].to(device).float()
                labels = sample['label'].to(device)                
                
                #Batch Size
                batchSize = labels.shape[0]
                
                # Set grads to zero
                optimizer.zero_grad()

                # Forward
                # Register ops only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward & parameters update only in train
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Accumulate the running loss
                running_loss += loss.item() * inputs.size(0)
                
                #Apply a softmax to the output
                outputs=F.softmax(outputs.data,dim=1)
                # Store outputs and labels 
                outputs_m [contSamples:contSamples+batchSize,...]=outputs.cpu().numpy()
                labels_m [contSamples:contSamples+batchSize]=labels.cpu().numpy()
                contSamples+=batchSize
                
            #At the end of an epoch, update the lr scheduler    
            if phase == 'train':
                scheduler.step()
            
            #Accumulated loss by epoch
            epoch_loss = running_loss / dataset_sizes[phase]
            
            #Compute the AUCs at the end of the epoch
            aucs=computeAUCs(outputs_m,labels_m)
            
            #And the Average AUC
            epoch_auc = aucs.mean()
                         
            print('{} Loss: {:.4f} AUC mel: {:.4f} sk: {:.4f} avg: {:.4f}'.format(
                phase, epoch_loss, aucs[0], aucs[1], epoch_auc))

            #Save checkpoint
            path = output_dir + str(model_name) + str(epoch) +'.pt'
            torch.save({
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'epoch': epoch,
                }, os.path.join(path))
            
            # Deep copy of the best model
            if phase == 'val' and epoch_auc > best_auc:
                best_auc = epoch_auc
                best_aucs = aucs.copy()        
                best_model_wts = copy.deepcopy(model.state_dict())
                best_epoch = epoch
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best model in epoch {:d} val AUCs: mel {:4f} sk {:4f} avg {:4f}'.format(best_epoch,best_aucs[0],best_aucs[1],best_auc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

## Test model

In [None]:
### Code that generates the test matrix
def test_model(model):
    since = time.time()
    
    numClasses = len(test_dataset.classes)
    
    model.eval()   # Ponemos el modelo en modo evaluación

    #Tamaño del dataset
    numSamples = len(test_dataset)
            
    # Creamos las variables que almacenarán las salidas y las etiquetas
    outputs_m=np.zeros((numSamples,numClasses),dtype=np.float)
    labels_m=np.zeros((numSamples,),dtype=np.int)
    contSamples=0
            
    # Iteramos sobre los datos
    for sample in test_dataloader:
        inputs = sample['image'].to(device).float()
                
                
        #Tamaño del batch
        batchSize = inputs.shape[0]
                
        # Paso forward
        with torch.torch.no_grad():
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
                
            #Aplicamos un softmax a la salida
            outputs=F.softmax(outputs.data,dim=1)
            outputs_m [contSamples:contSamples+batchSize,...]=outputs.cpu().numpy()
            contSamples+=batchSize
                
            
    return outputs_m

# PART 1) CUSTOM NETWORK



## Model architecture

In [None]:
#Example network ModifiedCustmoNet
class CustomNet(nn.Module):
    def __init__(self):
        super(CustomNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.conv3 = nn.Conv2d(128, 212, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(212)
        self.conv4 = nn.Conv2d(212, 256, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.fc1 = nn.Linear(256 * 14*14, 512)
        self.fc2 = nn.Linear(512, 3)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.max_pool2d(x, 2)
        x = x.flatten(1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

## Training

In [None]:
def weights_init_custom(m):
    if isinstance(m, torch.nn.Conv2d):
      torch.nn.init.xavier_normal_(m.weight,1.0)
      if m.bias is not None:
          nn.init.constant_(m.bias.data, 0)
          
    elif isinstance(m, torch.nn.Linear):
      torch.nn.init.xavier_normal_(m.weight,1.0)
      if m.bias is not None:
          nn.init.constant_(m.bias.data, 0)   


customNet = CustomNet() #we initialize the network
customNet.to(device) #copy the network to the device (gpu)
load = False

if load:
  path = './Checkpoints/custom_another18.pt'
  customNet.load_state_dict(torch.load(path)['model_state_dict'])
  customNet.to(device)
else:  
  #Weight init
  customNet.apply(weights_init_custom)

criterion = nn.CrossEntropyLoss()

# SGD with momentum 
#optimizer_ft = optim.SGD(customNet.parameters(), lr=1e-3, momentum=0.9)
optimizer_ft = optim.Adam(customNet.parameters(), lr=1e-3, weight_decay=1e-3) #optimizer



# An lr strategy which decreases lr by a factor of 0.1 every 7 epochs 
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
#we will train the model for 30 epochs
random.seed(42)
npr.seed(42)
torch.manual_seed(42)

customNet = train_model(customNet, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=30, output_dir= './Checkpoints/', model_name='custom_another')

## Test

In [None]:
import csv     

outputs=test_model(customNet)

with open('output_test_custom.csv', mode='w') as out_file:
    csv_writer = csv.writer(out_file, delimiter=',')
    csv_writer.writerows(outputs);

# PART 2) PRETRAINED NETWORK



## Model architecture

In [None]:
def weights_init(m):
    if isinstance(m, torch.nn.Linear):
      torch.nn.init.xavier_normal_(m.weight,1.0)
      if m.bias is not None:
          nn.init.constant_(m.bias.data, 0)      

In [None]:
# RESNET
model_ft = models.resnet101(pretrained=True)

num_classes = len(train_dataset.classes)
model_ft.fc = nn.Sequential(
    nn.Dropout(0.3),
    nn.Linear(2048, num_classes)
)


#Resume training
load = False
if load:
  path = './Checkpoints/final_pretrained.pt'
  model_ft.load_state_dict(torch.load(path)['model_state_dict'])
else:  
  #Weight init
  model_ft.apply(weights_init)

model_ft.to(device)

## Training

In [None]:
#Freeze layers
for p in model_ft.parameters():
  p.requires_grad = False

for p in model_ft.layer4.parameters():
  p.requires_grad = True

#Except classifier  
for p in model_ft.fc.parameters():
  p.requires_grad = True  

p1 = [p for p in model_ft.layer4.parameters() if p.requires_grad]
p2 = [p for p in model_ft.fc.parameters() if p.requires_grad]

class_weights = [1/0.686, 1/0.187, 1/0.127] / np.sum([1/0.686, 1/0.187, 1/0.127])
class_weights = torch.from_numpy(class_weights).float().to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights)


optimizer_ft = optim.Adam([{'params': p1, 'lr': 1e-4}, 
                        {'params': p2, 'lr': 1e-3},
                        ], lr=1e-3, weight_decay=1e-2)



exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)

In [None]:
#We fix seeds for reproducibility
random.seed(42)
npr.seed(42)
torch.manual_seed(42)

model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
                       num_epochs=25, model_name='pre_new')

## Test


In [None]:
import csv

outputs=test_model(model_ft)

with open('output_test_pretrained.csv', mode='w') as out_file:
    csv_writer = csv.writer(out_file, delimiter=',')
    csv_writer.writerows(outputs);