[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/CU-Robotics/swarm/blob/main/cnn/hyper_parameter_optimization.ipynb)

In [None]:
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import random_split, DataLoader
from torch.utils.data import Dataset

from PIL import Image
import os
import json

In [None]:
from dotenv import load_dotenv
import gdown
import zipfile
import gdown

if 'COLAB_GPU' in os.environ or 'TENSORFLOW_USE_SYNC_ON_FINISH' in os.environ:
    print("Running in Google Colab")
    !pip install optuna
    
    zip_path = os.getcwd() + "/data.zip"
    extract_path = os.getcwd() + "/data/"
    
    from google.colab import userdata
    data_link = userdata.get("DATA_FILE_LINK")

else:
    print("Not running in Google Colab (likely VS Code or local environment)")
    
    zip_path = os.getcwd() + "/../collections/data.zip"
    extract_path = os.getcwd() + "/../collections/data/"

    load_dotenv()
    data_link = os.getenv("DATA_FILE_LINK")

# Construct download URL
url = f"https://drive.google.com/uc?id={data_link}"

# Download the zip file
if not os.path.exists(zip_path):
    gdown.download(url, zip_path, quiet=False)

# Unzip it
if not os.path.exists(extract_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)

print(f"Data extracted to {extract_path}")
os.remove(zip_path)

In [None]:
import optuna
from optuna.trial import TrialState

DEVICE = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
BATCHSIZE = 128
CLASSES = 7
EPOCHS = 10
DIR = extract_path
N_TRAIN_EXAMPLES = BATCHSIZE * 30
N_VALID_EXAMPLES = BATCHSIZE * 10

print(f'Using device: {DEVICE}')

In [None]:
# Custom dataset class for loading images and labels
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.img_labels = json.load(open(annotations_file)) # path to pipeline.json
        self.img_dir = img_dir                              # folder where the cleaned images are
        self.transform = transform
        self.classes = {'1':1, '2':2, '3':3, '4':4, 'sentry':5, 'base':6, 'tower':7}

    def __len__(self):
        return len(self.img_labels)

    # gets image and label at index idx based on position in json
    def __getitem__(self, idx):
        image_data = self.img_labels[idx]
        img_name = image_data["name"]
        img_folder = image_data["folder"]

        img_path = os.path.join(self.img_dir, img_folder, "cropped", img_name)

        image = Image.open(img_path).convert("RGB")

        label = self.classes[image_data["labels"]["icon"]]

        if self.transform:
            image = self.transform(image)

        return image, label



In [None]:
def define_model(trial):

    num_layers = trial.suggest_int('num_layers', 1, 3)
    layers = []

    in_features = 1
    for i in range(num_layers):
        out_features = trial.suggest_int(f'n_units_l{i}', 16, 64, step = 16)
        kernel_size = trial.suggest_int(f'kernel_size_l{i}', 3, 7, step=2)
        layers.append(nn.Conv2d(in_features, out_features, kernel_size=kernel_size, padding=kernel_size//2))
        layers.append(nn.ReLU())
        layers.append(nn.MaxPool2d(2))

        in_features = out_features

    model = nn.Sequential(*layers)

    # Flatten layer
    model.add_module("flatten", nn.Flatten())

    # Estimate feature size after convolutions
    with torch.no_grad():
        dummy = torch.zeros(1, 1, 100, 100)
        n_features = model(dummy).shape[1]

    # Add final classifier
    model.add_module("fc", nn.Linear(n_features, CLASSES))
    model.add_module("logsoftmax", nn.LogSoftmax(dim=1))

    return model

In [None]:
def get_data_loaders():
    annotations_file = os.path.join(DIR, 'cleaned_metadata.json')

    # transform = transforms.Compose([
    #     transforms.Grayscale(num_output_channels=1),
    #     transforms.ToTensor(),
    #     transforms.Lambda(lambda t: t.sqrt()),
    # ])

    transform = transforms.Compose([
      transforms.Grayscale(num_output_channels=1),
      transforms.RandomHorizontalFlip(),
      transforms.RandomRotation(15),
      transforms.ColorJitter(brightness=0.2, contrast=0.2),
      transforms.ToTensor(),
      transforms.Lambda(lambda t: t.sqrt()),
    ])

    dataset = CustomImageDataset(annotations_file, img_dir=DIR, transform=transform)

    # Split sizes
    train_size = int(0.8 * len(dataset))  # 80%
    val_size = len(dataset) - train_size  # remaining 20%

    # Random split
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(train_dataset, batch_size=BATCHSIZE, shuffle=True, num_workers=2)

    val_loader = DataLoader(val_dataset, batch_size=BATCHSIZE, shuffle=True, num_workers=2)

    return train_loader, val_loader

In [None]:
def objective(trial):
    model = define_model(trial).to(DEVICE)

    # Suggest hyperparameters for optimizer
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)

    train_loader, valid_loader = get_data_loaders()

    # Training of the model.
    for epoch in range(EPOCHS):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            # Limiting training data for faster epochs.
            if batch_idx * BATCHSIZE >= N_TRAIN_EXAMPLES:
                break

            data, target = data.view(data.size(0), 1, 100, 100).to(DEVICE), target.to(DEVICE)

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

        # Validation of the model.
        model.eval()
        correct = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(valid_loader):
                # Limiting validation data.
                if batch_idx * BATCHSIZE >= N_VALID_EXAMPLES:
                    break
                data, target = data.view(data.size(0), 1, 100, 100).to(DEVICE), target.to(DEVICE)
                output = model(data)
                # Get the index of the max log-probability.
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

        accuracy = correct / min(len(valid_loader.dataset), N_VALID_EXAMPLES)

        trial.report(accuracy, epoch)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return accuracy

In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=1000)

pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
import pandas as pd

# Convert trials to DataFrame
df = study.trials_dataframe()
# print(df)

# Save to CSV
df.to_csv("optuna_trials_rand.csv", index=False)

In [None]:
# use best model to train on full training set and evaluate on test set
best_model = define_model(study.best_trial).to(DEVICE)
print(study.best_params)

dataset = CustomImageDataset(os.path.join(DIR, 'cleaned_metadata.json'), img_dir=DIR, transform=transforms.Compose([
      transforms.Grayscale(num_output_channels=1),
      transforms.ToTensor(),
      transforms.Lambda(lambda t: t.sqrt()),
    ]))

train_loader = DataLoader(dataset, batch_size=BATCHSIZE, shuffle=True, num_workers=2)
if 'optimizer' in study.best_params:
    optimizer_name = study.best_params['optimizer']
    optimizer = getattr(optim, optimizer_name)(best_model.parameters(), lr=study.best_params['lr'])
else:
    optimizer = optim.Adam(best_model.parameters(), lr=study.best_params['lr'])

for epoch in range(EPOCHS):
    best_model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.view(data.size(0), 1, 100, 100).to(DEVICE), target.to(DEVICE)

        optimizer.zero_grad()
        output = best_model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
    # Print training progress
    print(f"Epoch {epoch+1}/{EPOCHS} completed.")
    
# Save the trained model
torch.save(best_model.state_dict(), "best_model.pth")

# test inference speed
import time

fake_image = torch.randn(1, 1, 100, 100).to(DEVICE)

# warm up
for _ in range(10):
    _ = best_model(fake_image)

start_time = time.time()
n_inferences = 1000
for _ in range(n_inferences):
    _ = best_model(fake_image)
end_time = time.time()

print(f"Average inference time: {(end_time - start_time) / n_inferences * 1000:.4f} ms")

In [None]:
# display confidence of a test image using best model

