I have mounted the google drive and mentioned the path where the dataset is present

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/Assignment/

I have installed Augmentor so that I can fetch augmentation modules

In [None]:
!pip install Augmentor

I have installed pytorch-metric-learning for required modules

In [None]:
!pip install pytorch-metric-learning

I have loaded the dataset and implemented augmentation to increase the dataset size

In [5]:
import os
import pandas as pd
import numpy as np
import torch
import Augmentor
torch.manual_seed(1)

from PIL import Image
from torch.utils.data import Dataset


class SkinLessionDataset(Dataset):
    def __init__(self, csv_path, image_dir, transform = None):
        df = pd.read_csv(csv_path)
        self.image_dir = image_dir
        self.image_names = df["image_train"]
        self.image_labels = df["label_train"]
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(os.path.join(self.image_dir, self.image_names[index]))

        p = Augmentor.DataPipeline([[np.array(image)]])
        p.flip_random(probability=0.7)
        p.rotate(probability=0.7, max_left_rotation=25, max_right_rotation=25)
        p.rotate_without_crop(probability=0.7, max_left_rotation=10, max_right_rotation=10, expand=True)
        p.zoom(probability=0.7, min_factor=1.1, max_factor=1.7)
        p.random_distortion(probability=0.7, grid_width=10, grid_height=10, magnitude=20)
        p.scale(probability=0.6, scale_factor=1.3)
        p.random_brightness(probability=0.5, min_factor=0.35, max_factor=0.65)
        p.crop_random(probability=0.7, percentage_area=0.5)


        p.resize(probability=1.0, width=224, height=224)


        image_aug = p.sample(1)

        image = Image.fromarray(image_aug[0][0])
        # print(image)

        if not self.transform is None:
            image = self.transform(image)

        return image, self.image_labels[index]

    def __len__(self):
        return self.image_labels.shape[0]

Defined class for contrastive loss and performed normalization and transformation

In [6]:
import torch.nn as nn
import torch.nn.functional as F
from pytorch_metric_learning import losses

class SupervisedContrastiveLoss(nn.Module):
  def __init__(self, temperature=0.1):
    super(SupervisedContrastiveLoss, self).__init__()
    self.temperature = temperature

  def forward(self, feature_vectors, labels):
    # Normalize feature vectors
    feature_vectors_normalized = F.normalize(feature_vectors, p=2, dim=1)
    # Compute logits
    logits = torch.div(
        torch.matmul(
            feature_vectors_normalized, torch.transpose(feature_vectors_normalized, 0, 1)
        ),
        self.temperature,
    )
    return losses.NTXentLoss(temperature=0.07)(logits, torch.squeeze(labels))

Loaded the train and test dataset

In [7]:
from torchvision import transforms
from torch.utils.data import DataLoader

bs = 16

skin_lession_transform_train = transforms.Compose([
                                                # transforms.Resize(224),
                                                transforms.ToTensor(),
                                                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                          ])


skin_lession_transform_test = transforms.Compose([
                                                # transforms.Resize(224),
                                                transforms.ToTensor(),
                                                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                          ])

train_dataset = SkinLessionDataset(
                                    csv_path = "/content/drive/MyDrive/Assignment/train_final.csv",
                                    image_dir = "/content/drive/MyDrive/Assignment/train_images/",
                                    transform = skin_lession_transform_train
                                  )

train_loader = DataLoader(
                            dataset = train_dataset,
                            batch_size = bs,
                            drop_last = True,
                            shuffle = True,
                            num_workers = 2
                        )

test_dataset = SkinLessionDataset(
                                    csv_path = "/content/drive/MyDrive/Assignment/test.csv",
                                    image_dir = "/content/drive/MyDrive/Assignment/test_images/",
                                    transform = skin_lession_transform_test
                                )
test_loader = DataLoader(
                            dataset = test_dataset,
                            batch_size = bs,
                            drop_last = True,
                            shuffle = False,
                            num_workers = 2
                        )

GPU allocation

In [None]:
from __future__ import print_function
from __future__ import division

import time
import os
import copy
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from pathlib import Path

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

import torchvision
from torchvision import datasets, models, transforms

# from pytorch_metric_learning import losses
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

Defined the operations to be performed on the dataset

In [None]:
!pip install -U scikit-learn scipy matplotlib
!pip install sklearn


from sklearn.linear_model import LogisticRegression
from sklearn import metrics
import matplotlib.pyplot as plt
import seaborn as sns



def trainer(model, dataloaders, criterion, optimizer, scheduler, eval = True, num_epochs=10):
  since = time.time()

  val_acc_history = []
  train_losses = []
  train_accuracy = []
  test_losses = []
  test_accuracy = []

  best_model_wts = None

  best_acc = float('-inf')

  writer = SummaryWriter()

  for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    running_loss = 0
    correct = 0
    total = 0
    running_corrects = 0

    # iterate over the dataset
    train_iterator = tqdm(enumerate(dataloaders['train']),
                          'Training',
                          total=len(dataloaders['train']))
    for i, (images, labels) in train_iterator:

      # images and labels to the device
      images = images.to(device)
      labels = labels.to(device)

      # zero the optimizer gradients
      optimizer.zero_grad()

      # One forward pass
      outputs = model(images)

      # calculate loss
      if (len(criterion)==1):
        loss = criterion[0](outputs, labels)
      elif (len(criterion)==2):
        loss = criterion[0](outputs, labels) + criterion[1](outputs, labels)
      loss.backward() # one backward pass
      running_loss += loss.item()

      optimizer.step() # update the optimizer parameters

      # get the output
      _, preds = torch.max(outputs, 1)

      # statistics
      running_corrects += torch.sum(preds == labels.data)

    # train loss and accuracy
    train_loss = running_loss / len(dataloaders['train'].dataset)
    train_acc = running_corrects.double() / len(dataloaders['train'].dataset)

    print('{} Loss: {:.4f} Acc: {:.4f}'.format('train', train_loss, train_acc))

    writer.add_scalar('Loss/train', train_loss,epoch)
    writer.add_scalar('Accuracy/train', train_acc, epoch)

    if eval:
      # set the model evaluation mode
      model.eval()

      with torch.no_grad():
        test_correct = 0
        running_loss = 0

        test_iterator = tqdm(enumerate(dataloaders['val']),
                             'Validation',
                             total=len(dataloaders['val']))
        for j, (images,labels) in test_iterator:
          images = images.to(device)
          labels = labels.to(device)

          test_outputs = model(images)

          if (len(criterion)==1):
            loss = criterion[0](outputs, labels)
          elif (len(criterion)==2):
            loss = criterion[0](outputs, labels) + criterion[1](outputs, labels)

          running_loss += loss.item()
          _, test_predicted = torch.max(test_outputs, 1)

          test_correct += torch.sum(test_predicted == labels.data)
          pred=torch.tensor(test_predicted).detach().cpu().numpy()
          labeltest=torch.tensor(labels.data).detach().cpu().numpy()
          cm=metrics.confusion_matrix(pred,labeltest)



      test_loss = running_loss / len(dataloaders['val'].dataset)
      test_acc = test_correct.double() / len(dataloaders['val'].dataset)
      plt.figure(figsize=(9,9))
      sns.heatmap(cm,annot=True,fmt='0.3f',linewidth=0.5,square=True,cbar=False)
      #plt.ylabel('actual values')
      #plt.xlabel('predicted values')
      #plt.show()
      print(metrics.classification_report(pred,labeltest))
      print('{} Loss: {:.4f} Acc: {:.4f}'.format('val', test_loss, test_acc))

      writer.add_scalar('Loss/test', test_loss, epoch)
      writer.add_scalar('Accuracy/test',test_acc, epoch)

      # deep copy the model
      if train_acc > best_acc:
        best_acc = train_acc
        best_model_wts = copy.deepcopy(model.state_dict())

    val_acc_history.append(test_acc)
    train_accuracy.append(train_acc)
    train_losses.append(train_loss)
    test_accuracy.append(test_acc)
    test_losses.append(test_loss)


    # scheduler.step()
  time_elapsed = time.time() - since
  print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
  print('Best val Acc: {:4f}'.format(best_acc))
  #print(metrics.classification_report(pred,labeltest))

  Path('model_store/').mkdir(parents=True, exist_ok=True)
  if (len(criterion)==1):
    torch.save(best_model_wts, 'model_store/'+'resnet50-best-model-parameters_without_scl.pth')
  elif (len(criterion)==2):
    torch.save(best_model_wts, 'model_store/'+'resnet50-best-model-parameters_with_scl.pth')
  return model, train_accuracy, train_losses, test_accuracy, test_losses

In [10]:
def set_parameter_requires_grad(model, feature_extracting):
  if feature_extracting:
    for param in model.parameters():
      param.requires_grad = False

Mentioned number of classes, number of epochs and feature extract

In [11]:
# Number of classes in the dataset
num_classes = 7



# Number of epochs to train for
num_epochs = 10


#   Feature extract is True for updating the reshaped layer params
feature_extract = True

Allocating the Resnet50 model and mentioned the image size

In [None]:
model_ft = models.resnet50(pretrained=True)
set_parameter_requires_grad(model_ft, feature_extract)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, num_classes)
input_size = 224

model_ft = model_ft.to(device)

Defined different types of optimizers

In [None]:
import torch.optim as optim


params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
else:
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

#optimizer_ft = optim.SGD(params_to_update, lr=0.004, momentum=0.9)
optimizer_ft = optim.Adam(params_to_update, lr=0.001)
#optimizer_ft = optim.RMSprop(params_to_update, lr=0.004)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer_ft)

Running the model and evaluating the loss and accuracy

In [None]:
# Setup the loss fxn
criterion = [nn.CrossEntropyLoss()]

# Train and evaluate
model, train_accuracy, train_losses, test_accuracy, test_losses = trainer(
                          model_ft,
                          {"train" : train_loader, "val" : test_loader},
                          criterion,
                          optimizer_ft, scheduler, num_epochs=num_epochs
                            )

Evaluating the loss and accuracy adding Contrastive loss

In [None]:
# Setup the loss fxn
criterion = [nn.CrossEntropyLoss(), SupervisedContrastiveLoss()]

# Train and evaluate
model, train_accuracy, train_losses, test_accuracy, test_losses = trainer(
                          model_ft,
                          {"train" : train_loader, "val" : test_loader},
                          criterion,
                          optimizer_ft, scheduler, num_epochs=num_epochs
                            )

Plotting the graphs

In [None]:
plt.plot(train_losses, label='training loss')
plt.plot(test_losses, label='validation loss')
plt.title('Loss at the end of each epoch')
plt.legend();

Converting tensor list to numpy array

In [None]:
test_accuracy

In [17]:
t=torch.tensor(test_accuracy).detach().cpu().numpy()

In [18]:
d=torch.tensor(train_accuracy).detach().cpu().numpy()

In [None]:
plt.plot(d, label='training accuracy')
plt.plot(t, label='validation accuracy')
plt.title('Accuracy at the end of each epoch')
plt.legend();