<a href="https://colab.research.google.com/github/aghazahedim/How-Deep-is-Your-Art/blob/main/How_Deep_is_Your_Art.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# This script borrows heavily from Ben Trevett PyTorch tutorials
# https://github.com/bentrevett/pytorch-image-classification/tree/master?tab=readme-ov-file

# Importing dependencies
import copy
import random
import time
import os
import shutil
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from sklearn import manifold
import matplotlib.pyplot as plt
import torch.utils.data as data
import torch.nn.functional as F
from sklearn import decomposition
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from torch.optim.lr_scheduler import _LRScheduler

# Constants
EPOCHS = 10
BATCH_SIZE = 8
START_LR = 1e-7
END_LR = 10
NUM_ITER = 100
FOUND_LR = 5e-4
VALID_RATIO = 0.9
TRAIN_RATIO = 0.8
ROOT = 'replace with your own parent directory'
min_seed = 1
max_seed = 10000
seed_list = [seed for seed in range(min_seed, max_seed+1)]
pretrained_size = 224
pretrained_means = [0.485, 0.456, 0.406]
pretrained_stds= [0.229, 0.224, 0.225]
vgg11_config = [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']


# Global Variables
results = []
best_valid_loss = float('inf')

# Creating paths for (1) Dataset, (2) Train set, (3) Test set, (4) Classes
data_dir = ROOT
images_dir = os.path.join(data_dir, 'Dataset')
train_dir = os.path.join(data_dir, 'Train')
test_dir = os.path.join(data_dir, 'Test')
classes = os.listdir(images_dir)
os.makedirs('replace wirh your own directoy'.format(j = TRAIN_RATIO, ROOT = ROOT), exist_ok = True)



# Functions

# In-trainig Accuracy
def calculate_accuracy(y_pred, y):
  top_pred = y_pred.argmax(1, keepdim = True)
  correct = top_pred.eq(y.view_as(top_pred)).sum()
  acc = correct.float() / y.shape[0]
  return acc

# Training Function
def train(model, iterator, optimizer, criterion, device):
  epoch_loss = 0
  epoch_acc = 0

  model.train()

  for (x, y) in iterator:

      x = x.to(device)
      y = y.to(device)

      optimizer.zero_grad()

      y_pred, _ = model(x)

      loss = criterion(y_pred, y)

      acc = calculate_accuracy(y_pred, y)

      loss.backward()

      optimizer.step()

      epoch_loss += loss.item()
      epoch_acc += acc.item()

  return epoch_loss / len(iterator), epoch_acc / len(iterator)

# Evaluating Function
def evaluate(model, iterator, criterion, device, seed):
  epoch_loss = 0
  epoch_acc = 0

  model.eval()

  with torch.no_grad():

      for (x, y) in iterator:

          x = x.to(device)
          y = y.to(device)

          y_pred, _ = model(x)

          loss = criterion(y_pred, y)

          acc = calculate_accuracy(y_pred, y)

          epoch_loss += loss.item()
          epoch_acc += acc.item()

  if seed:
    return epoch_loss / len(iterator), epoch_acc / len(iterator), SEED
  else:
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# Calculating the Epoch Time
def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  return elapsed_mins, elapsed_secs

# Getting the model predictions
def get_predictions(model, iterator):

  model.eval()

  images = []
  labels = []
  probs = []

  with torch.no_grad():

      for (x, y) in iterator:

          x = x.to(device)

          y_pred, _ = model(x)

          y_prob = F.softmax(y_pred, dim = -1)
          top_pred = y_prob.argmax(1, keepdim = True)

          images.append(x.cpu())
          labels.append(y.cpu())
          probs.append(y_prob.cpu())

  images = torch.cat(images, dim = 0)
  labels = torch.cat(labels, dim = 0)
  probs = torch.cat(probs, dim = 0)

  return images, labels, probs

# Defining the VGG Model Class and
# overwriting the required definitions
class VGG(nn.Module):
  def __init__(self, features, output_dim):
    super().__init__()

    self.features = features

    self.avgpool = nn.AdaptiveAvgPool2d(7)

    self.classifier = nn.Sequential(
        nn.Linear(512 * 7 * 7, 4096),
        nn.ReLU(inplace = True),
        nn.Dropout(0.5),
        nn.Linear(4096, 4096),
        nn.ReLU(inplace = True),
        nn.Dropout(0.5),
        nn.Linear(4096, output_dim),
    )

  def forward(self, x):
    x = self.features(x)
    x = self.avgpool(x)
    h = x.view(x.shape[0], -1)
    x = self.classifier(h)
    return x, h

def get_vgg_layers(config, batch_norm):
  layers = []
  in_channels = 3

  for c in config:
    assert c == 'M' or isinstance(c, int)
    if c == 'M':
      layers += [nn.MaxPool2d(kernel_size = 2)]
    else:
      conv2d = nn.Conv2d(in_channels, c, kernel_size = 3, padding = 1)
      if batch_norm:
        layers += [conv2d, nn.BatchNorm2d(c), nn.ReLU(inplace = True)]
      else:
        layers += [conv2d, nn.ReLU(inplace = True)]
      in_channels = c

  return nn.Sequential(*layers)

# Learning Rate Finder Function
class LRFinder:
  def __init__(self, model, optimizer, criterion, device):

    self.optimizer = optimizer
    self.model = model
    self.criterion = criterion
    self.device = device

    torch.save(model.state_dict(), 'init_params.pt')

  def range_test(self, iterator, end_lr = 10, num_iter = 100,
                   smooth_f = 0.05, diverge_th = 5):

    lrs = []
    losses = []
    best_loss = float('inf')

    lr_scheduler = ExponentialLR(self.optimizer, end_lr, num_iter)

    iterator = IteratorWrapper(iterator)

    for iteration in range(num_iter):

        loss = self._train_batch(iterator)

        lrs.append(lr_scheduler.get_last_lr()[0])

        #update lr
        lr_scheduler.step()

        if iteration > 0:
            loss = smooth_f * loss + (1 - smooth_f) * losses[-1]

        if loss < best_loss:
            best_loss = loss

        losses.append(loss)

        if loss > diverge_th * best_loss:
            print("Stopping early, the loss has diverged")
            break

    #reset model to initial parameters
    model.load_state_dict(torch.load('init_params.pt'))


    return lrs, losses

  def _train_batch(self, iterator):

    self.model.train()

    self.optimizer.zero_grad()

    x, y = iterator.get_batch()

    x = x.to(self.device)
    y = y.to(self.device)

    y_pred, _ = self.model(x)

    loss = self.criterion(y_pred, y)

    loss.backward()

    self.optimizer.step()

    return loss.item()
# Exponential Scheduler Function
class ExponentialLR(_LRScheduler):
  def __init__(self, optimizer, end_lr, num_iter, last_epoch=-1):
    self.end_lr = end_lr
    self.num_iter = num_iter
    super(ExponentialLR, self).__init__(optimizer, last_epoch)

  def get_lr(self):
    curr_iter = self.last_epoch
    r = curr_iter / self.num_iter
    return [base_lr * (self.end_lr / base_lr) ** r for base_lr in self.base_lrs]
# Iterator Wrapper Function
class IteratorWrapper:
  def __init__(self, iterator):
    self.iterator = iterator
    self._iterator = iter(iterator)

  def __next__(self):
    try:
      inputs, labels = next(self._iterator)
    except StopIteration:
      self._iterator = iter(self.iterator)
      inputs, labels, *_ = next(self._iterator)

    return inputs, labels

  def get_batch(self):
      return next(self)
# Plotting Function for Learning Rate Finder
def plot_lr_finder(lrs, losses, skip_start = 5, skip_end = 5):
  if skip_end == 0:
    lrs = lrs[skip_start:]
    losses = losses[skip_start:]
  else:
    lrs = lrs[skip_start:-skip_end]
    losses = losses[skip_start:-skip_end]

  fig = plt.figure(figsize = (16,8))
  ax = fig.add_subplot(1,1,1)
  ax.plot(lrs, losses)
  ax.set_xscale('log')
  ax.set_xlabel('Learning rate')
  ax.set_ylabel('Loss')
  ax.grid(True, 'both', 'x')
  plt.show()




# Process Function
# (1) Wrapping the model as a unique random process - VGG()
# (2) Number of unique random processes to run - num
def VGG(num):

  # Feeding a unique random number from the list to the seed variable
  SEED = seed_list.pop(random.randint(min_seed,len(seed_list)))
  # Synchronizing all the random processing use the unique random number
  random.seed(SEED)
  np.random.seed(SEED)
  torch.manual_seed(SEED)
  torch.cuda.manual_seed(SEED)
  torch.backends.cudnn.deterministic = True

  # Data transformation for both sets
  # (1) Data Augmentation
  #     (a)Resize
  #     (b)Random Rotation
  #     (c)Random Horizontal Flip
  #     (d)Random Crop
  # (2) Normalization
    train_transforms = transforms.Compose([
                            transforms.Resize(pretrained_size),
                            transforms.RandomRotation(5),
                            transforms.RandomHorizontalFlip(0.5),
                            transforms.RandomCrop(pretrained_size, padding = 10),
                            transforms.ToTensor(),
                            transforms.Normalize(mean = pretrained_means,
                                                  std = pretrained_stds)
                        ])

  test_transforms = transforms.Compose([
                            transforms.Resize(pretrained_size),
                            transforms.CenterCrop(pretrained_size),
                            transforms.ToTensor(),
                            transforms.Normalize(mean = pretrained_means,
                                                  std = pretrained_stds)
                        ])

  # Loading the data
  train_data = datasets.ImageFolder(root = train_dir,
                                    transform = train_transforms)

  test_data = datasets.ImageFolder(root = test_dir,
                                  transform = test_transforms)

  # Create the validatoin splits
  n_train_examples = int(len(train_data) * VALID_RATIO)
  n_valid_examples = len(train_data) - n_train_examples

  train_data, valid_data = data.random_split(train_data,
                                            [n_train_examples, n_valid_examples])

  valid_data = copy.deepcopy(valid_data)
  valid_data.dataset.transform = test_transforms

  # Uncomment to Verify the three sets
  #print(f'Number of training examples: {len(train_data)}')
  #print(f'Number of validation examples: {len(valid_data)}')
  #print(f'Number of testing examples: {len(test_data)}')

  # Get the classes
  classes = test_data.classes
  # Get the iterators
  train_iterator = data.DataLoader(train_data,
                                  shuffle = True,
                                  batch_size = BATCH_SIZE)

  valid_iterator = data.DataLoader(valid_data,
                                  batch_size = BATCH_SIZE)

  test_iterator = data.DataLoader(test_data,
                                  batch_size = BATCH_SIZE)

  # Configure the layers for VGG-11
  vgg11_layers = get_vgg_layers(vgg11_config, batch_norm = True)

  # Set the output dimensions according to the dataset
  OUTPUT_DIM = classes
  # Build the Model
  model = VGG(vgg11_layers, OUTPUT_DIM)
  # Load the pre-trained weitghs
  pretrained_model = models.vgg11_bn(pretrained = True)
  # Remove the classification/output layer
  pretrained_model.classifier[-1]
  # Get the restof the netowkr as the features for Transfer Learning
  IN_FEATURES = pretrained_model.classifier[-1].in_features
  # Create a fully connected layer for the new output layer
  final_fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)
  # Replace the new output layer
  pretrained_model.classifier[-1] = final_fc
  # Load the state dictionary to the model
  model.load_state_dict(pretrained_model.state_dict())

  # Uncomment to use Learning Rate Finder
  #lr_finder = LRFinder(model, optimizer, criterion, device)
  #lrs, losses = lr_finder.range_test(train_iterator, END_LR, NUM_ITER)
  # Uncomment to inspect the Learning Rate V.S. Loss plot
  #plot_lr_finder(lrs, losses)
  # Uncomment to compare Adam default value
  #optimizer = optim.Adam(model.parameters(), lr = START_LR)

  # Discriminitve Fine-tuning
  # (1) model features at one order of magnitude lower
  # (2) classification layer at the founded learnig rate
  params = [
            {'params': model.features.parameters(), 'lr': FOUND_LR / 10},
            {'params': model.classifier.parameters()}
          ]
  # Get Adam Optimizer wiht the found learning rate
  optimizer = optim.Adam(params, lr = FOUND_LR)
  # Use GPU if available
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  # Get Cross Entropy Loss Fucntion
  criterion = nn.CrossEntropyLoss()
  # Load the model and criterion to the device
  model = model.to(device)
  criterion = criterion.to(device)

  # Uncomment to Verify the model
  #print(f'Model: {model}')
  # Uncomment to  Verify the pre-trained model
  #print(f'Pretrained Model:{pretrained_model}')
  # Uncomment to Verify the classifier
  #print(f'Pretrained Classifier:{pretrained_model.classifier}')


# Training the Model for the number of epochs
  for epoch in range(EPOCHS):

      start_time = time.monotonic()

      train_loss, train_acc = train(model, train_iterator, optimizer, criterion, device)
      valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, device, False)

      if valid_loss < best_valid_loss:
          best_valid_loss = valid_loss
          torch.save(model.state_dict(), 'best_weights.pt')

      end_time = time.monotonic()

      epoch_mins, epoch_secs = epoch_time(start_time, end_time)

      print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
      print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
      print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

  # Load the best model's parameter for infernece in the test set
  model.load_state_dict(torch.load('best_weights.pt'))
  results.append(evaluate(model, test_iterator, criterion, device, True))
  test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
  # Print the Test Loss and Accuracy
  print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

  # Get the predictions
  images, labels, probs = get_predictions(model, test_iterator)
  pred_labels = torch.argmax(probs, 1)

  # Calculate the Metrics
  stacked = torch.stack((labels, pred_labels), dim=1)
  labe=[]
  pred=[]
  for row in stacked:
      labels_, pred_labels_ = row.numpy()
      labe.append(labels_)
      pred.append(pred_labels_)

  # Get the Confusion Matrix
  cmatrix=confusion_matrix(labe, pred)
  FP = cmatrix.sum(axis=0) - np.diag(cmatrix)
  FN = cmatrix.sum(axis=1) - np.diag(cmatrix)
  TP = np.diag(cmatrix)
  TN = cmatrix.sum() - (FP + FN + TP)

  # Overall Acc
  ovAcc = np.sum(TP) / (np.sum(FN) + np.sum(TP))

  # Sensitivity, hit rate, recall, or true positive rate
  TPR = np.round(TP/(TP+FN),2)

  # Specificity or true negative rate
  TNR = np.round(TN/(TN+FP),2)

  # Precision or positive predictive value
  PPV = np.round(TP/(TP+FP),2)

  # Negative predictive value
  NPV = np.round(TN/(TN+FN),2)

  # Fall out or false positive rate
  FPR = np.round(FP/(FP+TN),2)

  # False negative rate
  FNR = np.round(FN/(TP+FN),2)

  # False discovery rate
  FDR = np.round(FP/(TP+FP),2)

  # Overall accuracy
  ACC = np.round((TP+TN)/(TP+FP+FN+TN),2)

  # Aggregate the Metrics
  tablevalue = np.vstack(([a[:6] for a in classes],TPR,TNR,PPV,NPV,FPR,FNR,FDR,ACC))
  all_metrics = pd.DataFrame(tablevalue, index=['class','TPR','TNR','PPV','NPV','FPR','FNR','FDR','ACC'])



# Iteration loop for runnig VGG with a unique seed
for i in range(10):
  VGG(i)


# Printing the Metrics and Train/Test Results
print(f'All Metrics DataFrame: {all_metrics}')
print(f'All Iteration results: {results}')
