# Hyperparameter Optimization using Optuna

## Imports

In [19]:
# PyTorch modules
import torch
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader, Subset, ConcatDataset
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.transforms import ToTensor
from torchvision.utils import save_image

# sklearn modules
from sklearn.model_selection import train_test_split

# File and directory handling modules
import os
from pathlib import Path
import requests
import zipfile
import shutil

# Other modules
import random

# Image processing modules
from PIL import Image

# Optuna
!pip install optuna
import optuna


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Downloading data from GitHub

In [7]:
# Setup path to a data folder
data_path = Path('data/')
image_path = data_path / 'chest_xray'

# If the image folder doesn't exist, download it and prepare it...
if image_path.is_dir():
  print(f'{image_path} directory already exists... skipping download')
else: 
  print(f'{image_path} does not exist, creating one...')
  image_path.mkdir(parents=True, exist_ok=True)

# Download x-ray scans data
with open(data_path / 'chest_xray.zip', 'wb') as f:
  request = requests.get('https://github.com/eliaszpiotr/PneumoniaDetection/raw/main/data/chest_xray.zip')
  print('Downloading data...')
  f.write(request.content)

# Unzip data
with zipfile.ZipFile(data_path / 'chest_xray.zip', 'r') as zip_ref:
  print('Unzipping data...')
  zip_ref.extractall(image_path)

data/chest_xray directory already exists... skipping download
Downloading data...
Unzipping data...


## Setting up paths

In [8]:
# Define paths to dataset folders
train_dir = image_path / 'chest_xray/train'
test_dir = image_path / 'chest_xray/test'
val_dir = image_path / 'chest_xray/val'

train_dir, test_dir, val_dir

(PosixPath('data/chest_xray/chest_xray/train'),
 PosixPath('data/chest_xray/chest_xray/test'),
 PosixPath('data/chest_xray/chest_xray/val'))

## Setup transformers

In [9]:
transformer = {
    'dataset1': transforms.Compose([
        transforms.Resize(255),
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.Grayscale(num_output_channels=1),
        transforms.RandomAffine(translate=(0.05, 0.05), degrees=0),
        transforms.ToTensor(),
    ]),
    'dataset2': transforms.Compose([
        transforms.Resize(255),
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(p=1),
        transforms.Grayscale(num_output_channels=1),
        transforms.RandomAffine(translate=(0.1, 0.05), degrees=10),
        transforms.ToTensor(),
    ]),
    'dataset3': transforms.Compose([
        transforms.Resize(255),
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(15),
        transforms.Grayscale(num_output_channels=1),
        transforms.RandomAffine(translate=(0.08, 0.1), degrees=15),
        transforms.ToTensor(),
    ]),
}

In [15]:
# Setup paths
pneumonia_train_dir = train_dir / 'PNEUMONIA'
normal_train_dir = train_dir/ 'NORMAL'
pneumonia_test_dir = test_dir/ 'PNEUMONIA'
normal_test_dir = test_dir/ 'NORMAL'
pneumonia_val_dir = val_dir / 'PNEUMONIA'
normal_val_dir = val_dir/ 'NORMAL'

## Creating helping functions

In [10]:
def apply_transforms(input_dir, transform, postfix):
    """
    Apply specified transformations to images in the input directory and save the transformed images to the same directory.

    Args:
        input_dir (str): Path to the directory containing the input images.
        transform (function): Transformation function to be applied to the images.
        postfix (str): String to append to the filename to indicate the transformation applied.

    Returns:
        None
    """
    # Only consider original images, not those that have been previously transformed
    img_list = [img for img in os.listdir(input_dir) if img.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')) and 'aug_' not in img]

    for i, img_name in enumerate(img_list):
        img_path = input_dir / img_name
        img = Image.open(img_path)
        transformed_img = transform(img)
        # Add the 'aug_' prefix to the filename to distinguish the augmented images from the original ones
        save_path = input_dir / f"aug_{postfix}_{img_name}"
        save_image(transformed_img, save_path)

    print(f"Saved {len(img_list)} transformed images to {input_dir}")

In [11]:
def select_and_save(input_dir, output_dir, num_images):
    """
    Randomly select the specified number of images from the given directory and save them to the output directory.

    Args:
        input_dir (str): Path to the directory containing the input images.
        output_dir (str): Path to the directory where the selected images will be saved.
        num_images (int): Number of images to select.

    Returns:
        None
    """
    img_list = [img for img in os.listdir(input_dir) if img.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif'))]
    combined_list = [input_dir / img for img in img_list]

    selected_images = random.sample(combined_list, num_images)

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for img_path in selected_images:
        shutil.copy(img_path, output_dir / img_path.name)

    print(f"Selected {len(os.listdir(output_dir))} images to {output_dir}")

## Appling transformers and sampling images

In [18]:
# Set the desired number of images
DESIRED_IMAGES = 500

# Create new directories for the selected data
selected_normal_dir = Path("selected_data/NORMAL")
selected_pneumonia_dir = Path("selected_data/PNEUMONIA")

# Apply transformations to all images and save them to the same directory
for transformer_name in ['dataset1', 'dataset2', 'dataset3']:
    apply_transforms(normal_train_dir, transformer[transformer_name], transformer_name)

# Select images from the combined original and transformed images and save them to the new directory
select_and_save(normal_train_dir, selected_normal_dir, DESIRED_IMAGES)

# Repeat the process for the pneumonia images
for transformer_name in ['dataset1', 'dataset2', 'dataset3']:
    apply_transforms(pneumonia_train_dir, transformer[transformer_name], transformer_name)

select_and_save(pneumonia_train_dir, selected_pneumonia_dir, DESIRED_IMAGES)

Saved 1341 transformed images to data/chest_xray/chest_xray/train/NORMAL
Saved 1341 transformed images to data/chest_xray/chest_xray/train/NORMAL
Saved 1341 transformed images to data/chest_xray/chest_xray/train/NORMAL
Selected 500 images to selected_data/NORMAL
Saved 3875 transformed images to data/chest_xray/chest_xray/train/PNEUMONIA
Saved 3875 transformed images to data/chest_xray/chest_xray/train/PNEUMONIA
Saved 3875 transformed images to data/chest_xray/chest_xray/train/PNEUMONIA
Selected 500 images to selected_data/PNEUMONIA


## Data loaders

In [20]:
data_dir = '/content/selected_data'

In [21]:
# Setup datasets
train_dataset = ImageFolder(data_dir,transform=transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
]))

## Making splits

In [22]:
# Spliting data
all_indices = list(range(len(train_dataset)))
train_indices, temp_indices = train_test_split(
    all_indices,
    test_size=0.3, # 30% of the data will be used as temporary (validation + test)
    random_state=42,  # Set a random state to make the split deterministic
    stratify=train_dataset.targets
)

val_indices, test_indices = train_test_split(
    temp_indices,
    test_size=0.5,  # Half of the temporary data will be used as validation, and the other half as test
    random_state=42,
    stratify=[train_dataset.targets[i] for i in temp_indices]
)

full_dataset = ConcatDataset([train_dataset, Subset(train_dataset, temp_indices)])

train_dataset = Subset(full_dataset, train_indices)
val_dataset = Subset(full_dataset, val_indices)
test_dataset = Subset(full_dataset, test_indices)

In [23]:
# Set DataLoader parameters
batch_size = 128
num_workers = os.cpu_count()

# Create DataLoaders for training, validation, and testing
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

## Model

In [31]:

class Classifier(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int, 
                 kernel_size: int, stride: int, padding: int, dropout_rate: float):
        super().__init__()
        
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(input_shape, hidden_units, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(dropout_rate)
        )
        
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units * 2, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.ReLU(),
            nn.Conv2d(hidden_units * 2, hidden_units * 2, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(dropout_rate)
        )
        
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(hidden_units * 2 * 7 * 7, hidden_units),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_units, output_shape)
        )
    
    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.avgpool(x)
        x = self.classifier(x)
        return x


## Objective function

In [33]:
def objective(trial):
    # Suggest values of the hyperparameters using a trial object.
    hidden_units = trial.suggest_int("hidden_units", 8, 32)
    lr = trial.suggest_float("lr", 1e-4, 0.01, log=True)
    kernel_size = trial.suggest_categorical("kernel_size", [3, 5])
    stride = trial.suggest_categorical("stride", [1, 2])
    padding = trial.suggest_categorical("padding", [0, 1])
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])  # Add batch_size as a hyperparameter

    model = Classifier(1, hidden_units, 2, kernel_size, stride, padding, dropout_rate)
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=lr)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  # Create train_loader with the suggested batch_size

    for epoch in range(10):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):  # use train_loader here
            inputs, labels = data
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        trial.report(running_loss, epoch)

        if trial.should_prune():
            raise optuna.TrialPruned()
    return running_loss


## Searching for best parameters

In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=100)

best_trial = study.best_trial

print("Best trial:")
print("  Value: ", best_trial.value)

print("  Params: ")
for key, value in best_trial.params.items():
    print("    {}: {}".format(key, value))

[32m[I 2023-05-15 15:05:51,393][0m A new study created in memory with name: no-name-54810a2a-2741-482f-aeb6-87b046abaec0[0m
