Install and import **Optuna**

In [None]:
!pip install optuna
import optuna

In [None]:
import torch.optim as optim
import os
import shutil
import cv2
import math
import random
from PIL import Image
import glob
import time, copy
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import lr_scheduler, Adam
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.models import ResNet152_Weights
from torchvision import datasets, models, transforms
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#save paths
save_name = 'optuna_hyp_optimization'
path_graph = '/content/drive/MyDrive/resnet152/risultati/grafici/'
path_model = '/content/drive/MyDrive/resnet152/risultati/modelli/'
path_pd = '/content/drive/MyDrive/resnet152/risultati/acc_loss/'

In [None]:
path_folder = '/content/drive/MyDrive/backups/wikiAum_resized2.0_splitted'

train_folder_path = path_folder + '/train'
test_folder_path = path_folder + '/test'
val_folder_path = path_folder + '/val' 


train_foto_paths = [] 
test_foto_paths = [] 
val_foto_paths = [] 
classes = [] 

# get all the paths from train_folder_path and append image paths and class to to respective lists
for data_path in glob.glob(train_folder_path + '/*'):
    classes.append(data_path.split('/')[-1]) 
    train_foto_paths.append(glob.glob(data_path + '/*'))

for data_path in glob.glob(test_folder_path + '/*'):
    test_foto_paths.append(glob.glob(data_path + '/*'))

for data_path in glob.glob(val_folder_path + '/*'):
    val_foto_paths.append(glob.glob(data_path + '/*'))

classes

['non_spurious', 'spurious']

In [None]:
train_paths = train_foto_paths[0] + train_foto_paths[1]
test_paths = test_foto_paths[0] + test_foto_paths[1]
val_paths = val_foto_paths[0] + val_foto_paths[1]

train_paths = list((np.array(train_paths).flatten()))
test_paths = list((np.array(test_paths).flatten()))
val_paths = list((np.array(val_paths).flatten()))


random.shuffle(train_paths)
random.shuffle(val_paths)
random.shuffle(test_paths)

In [None]:

idx_to_class = {i:j for i, j in enumerate(classes)}
class_to_idx = {value:key for key,value in idx_to_class.items()}
class_to_idx

{'non_spurious': 0, 'spurious': 1}

In [None]:
train_transforms = transforms.Compose([transforms.ToTensor(),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.RandomVerticalFlip(),
                                       transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                                       ])#resize if needed

val_transforms = transforms.Compose([transforms.ToTensor(),
                                     transforms.Normalize([0.485, 0.456, 0.406], 
                                                           [0.229, 0.224, 0.225])
                                    ])#resize if needed

test_transforms = transforms.Compose([transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], 
                                                          [0.229, 0.224, 0.225])
                                      ])#resize if needed
] )

In [None]:


#   Define Dataset Class


class Spuriedata(Dataset):
    def __init__(self, image_paths, transform):
        self.image_paths = image_paths
        self.transform = transform
        
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_filepath = self.image_paths[idx]
        #image = io.imread(image_filepath,0,pilmode="RGB")
        image = cv2.imread(image_filepath)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        
        label = image_filepath.split('/')[-2]
        label = class_to_idx[label]
        image = self.transform(image)
            
        
        return image, label
    
#######################################################
#                  Create Dataset
#######################################################

train_dataset = Spuriedata(train_paths,train_transforms)
val_dataset = Spuriedata(val_paths,val_transforms) 
test_dataset = Spuriedata(test_paths,test_transforms)



In [None]:
train_loader = DataLoader(
    train_dataset, batch_size=32, shuffle=True, num_workers=2)

val_loader = DataLoader(
    val_dataset, batch_size=64, shuffle=False, num_workers=2)

test_loader = DataLoader(
    test_dataset, batch_size=64, shuffle=False, num_workers=2)

In [None]:
#In order to get a pretrained model by its name, we will add a function get_model:
def get_model(model_name: str = "resnet152_droput5"):

  if model_name == "resnet152_droput5":
    model = torchvision.models.resnet152(weights=ResNet152_Weights.DEFAULT)
    for param in model.parameters():
      param.requires_grad = False
    num_ftrs = model.fc.in_features
    model.fc = nn.Sequential(
                        nn.Dropout(p=0.5, inplace=False),
                        nn.Linear(in_features=2048, out_features=1024, bias=True),
                        nn.ReLU(inplace=True),
                        nn.Dropout(p=0.5, inplace=False),
                        nn.Linear(in_features=1024, out_features=512, bias=True),
                        nn.ReLU(),
                        nn.Dropout(0.5),
                        nn.Linear(512, 256),
                        nn.ReLU(),
                        nn.Linear(256, 2)
                        )
      
  
  elif model_name == "resnet152_droputDownScaled":
    model = torchvision.models.resnet152(weights=ResNet152_Weights.DEFAULT)
    for param in model.parameters():
      param.requires_grad = False
    num_ftrs = model.fc.in_features
    model.fc = nn.Sequential(
                        nn.Dropout(p=0.5, inplace=False),
                        nn.Linear(in_features=2048, out_features=1024, bias=True),
                        nn.ReLU(inplace=True),
                        nn.Dropout(p=0.4, inplace=False),
                        nn.Linear(in_features=1024, out_features=512, bias=True),
                        nn.ReLU(),
                        nn.Dropout(0.3),
                        nn.Linear(512, 256),
                        nn.ReLU(),
                        nn.Linear(256, 2)
                        )
                        

  
  elif model_name == "resnet152_droput2":
    model = torchvision.models.resnet152(weights=ResNet152_Weights.DEFAULT)
    for param in model.parameters():
      param.requires_grad = False
    in_features = model.fc.in_features
    model.fc =nn.Sequential(
                        nn.Dropout(p=0.2, inplace=False),
                        nn.Linear(in_features=2048, out_features=1024, bias=True),
                        nn.ReLU(inplace=True),
                        nn.Dropout(p=0.2, inplace=False),
                        nn.Linear(in_features=1024, out_features=512, bias=True),
                        nn.ReLU(),
                        nn.Dropout(0.2),
                        nn.Linear(512, 256),
                        nn.ReLU(),
                        nn.Linear(256, 2)
                        )
    
  elif model_name == "resnet152_droputUpScaled":
    model = torchvision.models.resnet152(weights=ResNet152_Weights.DEFAULT)
    for param in model.parameters():
      param.requires_grad = False
    in_features = model.fc.in_features
    model.fc = nn.Sequential(
                        nn.Dropout(p=0.3, inplace=False),
                        nn.Linear(in_features=2048, out_features=1024, bias=True),
                        nn.ReLU(inplace=True),
                        nn.Dropout(p=0.4, inplace=False),
                        nn.Linear(in_features=1024, out_features=512, bias=True),
                        nn.ReLU(),
                        nn.Dropout(0.5),
                        nn.Linear(512, 256),
                        nn.ReLU(),
                        nn.Linear(256, 2)
                        )

    
  return model

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

The following function will be used to train the model:

In [None]:
def train_model(trial, model, criterion, optimizer,scheduler, num_epochs=30):
  start_time = time.time() #(for showing time)
  t_losses   = []
  v_losses   = []
  t_acc = []
  v_acc = []
  best_acc = 0
  best_loss = math.inf
  for epoch in range(num_epochs):
    #(loop for every epoch)
    print('Epoch {}/{}'.format(epoch, num_epochs - 1)) #(printing message)
    """ Training Phase """
    model.train()    #(training model)
    running_loss = 0.   #(set loss 0)
    running_corrects = 0 
    # load a batch data of images
    for i, (images, labels) in enumerate(train_loader):
      images = images.to(device)
      labels = labels.to(device) 
      # forward inputs and get output
      optimizer.zero_grad()
      outputs = model(images)
      _, preds = torch.max(outputs, 1)
      loss = criterion(outputs, labels)
      # get loss value and update the network weights
      loss.backward()
      optimizer.step()
      running_loss += loss.item() * images.size(0)
      running_corrects += torch.sum(preds == labels.data)
    scheduler.step()    
    epoch_loss = running_loss / len(train_dataset)
    t_losses.append(epoch_loss)
    epoch_acc = running_corrects / len(train_dataset) * 100.
    print('[Train #{}] Loss: {:.4f} Acc: {:.4f}% Time: {:.4f}m'.format(epoch, epoch_loss, epoch_acc, (time.time() -start_time)//60))
    t_acc.append(epoch_acc)
    
    """ eval Phase """
    model.eval()
    with torch.no_grad():
      running_loss = 0.
      running_corrects = 0
      for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * images.size(0)
        running_corrects += torch.sum(preds == labels.data)
      epoch_loss = running_loss / len(val_dataset)
      v_losses.append(epoch_loss)
      epoch_acc = running_corrects / len(val_dataset) * 100.
      print('[Val #{}] Loss: {:.4f} Acc: {:.4f}% Time: {:.4f}m'.format(epoch, epoch_loss, epoch_acc, (time.time()- start_time)//60))
      v_acc.append(epoch_acc)
      if epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        torch.save(model, path_model + save_name + '_acc.pth')
        if epoch_loss < best_loss:
          best_loss = epoch_loss
          best_model_wts = copy.deepcopy(model.state_dict())
          torch.save(model, path_model + save_name + '_loss.pth')
                
      print()
        
      trial.report(epoch_acc, epoch)
      if trial.should_prune():
        raise optuna.TrialPruned()

  time_elapsed = time.time() - start_time
  print('Training complete in {:.0f}m {:.0f}s'.format(
      time_elapsed // 60, time_elapsed % 60))
  print('Best val Acc: {:4f}'.format(best_acc))

  model.load_state_dict(best_model_wts)
  return model, best_acc

We need to create the **Objective Function**. It takes a configuration of hyperparameters and returns its evaluation score (Objective value). By maximizing or minimizing the **Objective Function**, Optuna solves the problem of hyperparameter optimization.




In [None]:
def objective(trial):
  params = {
      "model_name": trial.suggest_categorical('model_name',["resnet152_droput5", "resnet152_droputDownScaled", "resnet152_droput2","resnet152_droputUpScaled"]),
      "lr": trial.suggest_loguniform('lr', 1e-6, 1e-2),
      "optimizer_name": trial.suggest_categorical('optimizer_name',["SGD", "Adam"])}
    
    # Get pretrained model
  model = get_model(params["model_name"])
  model = model.to(device)
    
    # Define criterion
  criterion = nn.CrossEntropyLoss()
    
    # Configure optimizer
  optimizer = getattr(
      torch.optim, params["optimizer_name"]
    )(model.parameters(), lr=params["lr"])
  scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
    
    # Train model
  best_model, best_acc = train_model(trial, model, criterion, optimizer,scheduler)
    
  return best_acc

To start optimizing our **Objective Function**, we create a new **study**:


In [None]:
# sampler: We want to use a TPE sampler
# pruner: We use a MedianPruner in order to interrupt unpromising trials
# direction: The direction of study is “maximize” because we want to maximize the accuracy
# n_trials: Number of trials

sampler = optuna.samplers.TPESampler()    
study = optuna.create_study(
    sampler=sampler,
    pruner=optuna.pruners.MedianPruner(
        n_startup_trials=3, n_warmup_steps=5, interval_steps=3
    ),
    direction='maximize')
study.optimize(func=objective, n_trials=20)

In [None]:
print("Best trial: ")
print(study.best_trial)

In [None]:
model = torch.load(path_model + save_name + '_loss.pth', map_location=device)

In [None]:
true_labels = []
preds       = []
model.eval()
with torch.no_grad():
  for images, labels in test_loader:
    
    images = images.to(device)
    labels = labels.to(device)
    logits = model(images)    
      
    curr_preds = torch.argmax(F.softmax(logits, dim=-1), dim=-1)
    true_labels.extend(labels.view(-1).tolist())
    preds.extend(curr_preds.view(-1).tolist())
print('{}classification report'.format(classification_report(true_labels, preds)))
report = classification_report(true_labels, preds, output_dict=True)
df = pd.DataFrame(report).transpose()
df.to_csv(path_pd + save_name + '_report')

In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.figure(figsize=(3,3))
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001) 

In [None]:
def visualize_model(model, num_images=4):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(test_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                if classes[preds[j]] != classes[labels[j]]:
                  images_so_far += 1
                  ax = plt.subplot(num_images//2, 2, images_so_far)
                  ax.axis('off')
                  ax.set_title(f'predicted: {classes[preds[j]]}\nlabel: {classes[labels[j]]}')
                  imshow(inputs.cpu().data[j].permute(0,2,1))
                else: continue

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
visualize_model(model)