<a href="https://colab.research.google.com/github/jbsdoki/Squishy_Robots_Quant_Models/blob/main/Squish_Robot_Quant_Model_2_VideoGasNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Sources:
###Hyperparameter Tuning with Optuna:
https://medium.com/@taeefnajib/hyperparameter-tuning-using-optuna-c46d7b29a3e

https://optuna.org/#code_examples
###Multi-Modal ML Models
https://www.nature.com/articles/s41598-025-14901-4

###Next Models to test:
VideoGasNet:
https://www.sciencedirect.com/science/article/pii/S0360544221017643

GasVit: https://www.sciencedirect.com/science/article/pii/S1568494623011560?via%3Dihub#sec3

In [1]:
pip install optuna #Hyperparameter Optimizer

Collecting optuna
  Downloading optuna-4.5.0-py3-none-any.whl.metadata (17 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.10.1-py3-none-any.whl.metadata (11 kB)
Downloading optuna-4.5.0-py3-none-any.whl (400 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m400.9/400.9 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.10.1-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, optuna
Successfully installed colorlog-6.10.1 optuna-4.5.0


In [2]:
import os
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
from collections import defaultdict

import optuna


In [3]:
!unzip -q Final_Dataset.zip

## Print out the structure of the data

In [None]:
# Example of loading the first file
file_path = './Final_Dataset/data/class_0/1237_frame_1004_class_0.npy'
sample_data = np.load(file_path)
print(f"Shape of preprocessed sample data: {sample_data.shape}")
print(f"Data type of preprocessed sample data: {sample_data.dtype}")

# GasVid synthetic processed dataset should be 2 channels, 240x320 in dimension

## Load and preprocess the data

In [5]:
# Assuming the data is in 'Final_Dataset/data' and class folders are named 'class_0' ... 'class_7'
data_dir = 'Final_Dataset/data'
classes = sorted(os.listdir(data_dir))
file_paths = []
labels = []

# Group by video ID only (not by video+class)
video_to_files = defaultdict(list)
for i, class_name in enumerate(classes):
    class_dir = os.path.join(data_dir, class_name)
    for file_name in os.listdir(class_dir):
        if file_name.endswith('.npy'):
            video_id = file_name.split('_')[0]
            video_to_files[video_id].append((os.path.join(class_dir, file_name), i))

# Function to load and preprocess a single .npy file
def load_and_preprocess_npy(filepath):
    data = np.load(filepath)
    if data.dtype != np.float32:
        data = data.astype(np.float32)

    # Mormalize the data
    mean = data.mean()
    std = data.std()
    if std > 0:
        data = (data - mean) / std

    return data

# Add train-test split
from sklearn.model_selection import train_test_split

# Split by video IDs
video_ids = list(video_to_files.keys())
train_vids, test_vids = train_test_split(video_ids, test_size=0.2, random_state=42)

train_files, train_labels = [], []
test_files, test_labels = [], []

for vid in train_vids:
    for filepath, label in video_to_files[vid]:
        train_files.append(filepath)
        train_labels.append(label)

for vid in test_vids:
    for filepath, label in video_to_files[vid]:
        test_files.append(filepath)
        test_labels.append(label)

# Check class distributions
from collections import Counter
print("Training class distribution:", Counter(train_labels))
print("Testing class distribution:", Counter(test_labels))

Training class distribution: Counter({2: 1094, 3: 1092, 0: 1091, 7: 1089, 1: 1088, 6: 1087, 4: 1083, 5: 1083})
Testing class distribution: Counter({2: 299, 5: 299, 7: 299, 1: 298, 0: 297, 6: 297, 3: 296, 4: 296})


## Create a dataset and dataloader

In [6]:


class NpyDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        filepath = self.file_paths[idx]
        label = self.labels[idx]
        data = load_and_preprocess_npy(filepath) # Reuse the function from the previous step

        if self.transform:
            data = self.transform(data)

        return torch.from_numpy(data), torch.tensor(label)


# # Create instances of datasets
train_dataset = NpyDataset(train_files, train_labels)
test_dataset = NpyDataset(test_files, test_labels)


# # Create dataloaders
# # batch_size = 32
# train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # No need to shuffle test data

# print(f"Number of training samples in the dataset: {len(train_dataset)}")
# print(f"Number of testing samples in the dataset: {len(test_dataset)}")
# print(f"Number of batches in the training dataloader: {len(train_dataloader)}")
# print(f"Number of batches in the testing dataloader: {len(test_dataloader)} \n\n")


# # Example of iterating through the dataloader (optional)
# for data, labels in train_dataloader:
#     print(f"Train batch data shape: {data.shape}")
#     print(f"Train batch labels shape: {labels.shape}")
#     break

# # Example of iterating through the dataloader (optional)
# for data, labels in test_dataloader:
#     print(f"Test batch data shape: {data.shape}")
#     print(f"Test batch labels shape: {labels.shape}")
#     break

## Define the CNN model

## Define the Optuna Objective Function

This function will be called by Optuna for each trial. It will:
1. Suggest hyperparameters using the trial object.
2. Build and train the CNN model with the suggested hyperparameters.
3. Evaluate the model on a validation set
4. Return the metric to minimize (loss) or maximize (accuracy).

In [11]:
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

def objective(trial):

    #############################
    # All Hyperparameters Tested
    #############################
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD', 'Adadelta', 'AdamW'])
    momentum = trial.suggest_float('momentum', 0.0, 0.99) if optimizer_name in ['SGD', 'RMSprop'] else 0.0
    weight_decay = trial.suggest_float('weight_decay', 0.0, 0.01)
    hidden_size = trial.suggest_int('hidden_size', 64, 256)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])

    #####################
    # Define the Model
    #####################
    class VideoGasNet(nn.Module):
        def __init__(self):
            super(VideoGasNet, self).__init__()

            self.conv1 = nn.Conv2d(2, 32, kernel_size=3, padding=1)
            self.relu1 = nn.ReLU()
            self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

            self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
            self.relu2 = nn.ReLU()
            self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

            self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
            self.relu3 = nn.ReLU()
            self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

            # self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
            # self.relu4 = nn.ReLU()
            # self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

            # Calculate flatten size from conv layers from input(240x320)
            # flatten_size = 128 * (240 // 16) * (320 // 16) # 2^4 = 8 use for every conv + relu + pool section

            # Calculate flatten size from conv layers from input(240x320)
            flatten_size = 128 * (240 // 8) * (320 // 8) # 2^3 = 8 use for every conv + relu + pool section
            self.fc1 = nn.Linear(flatten_size, hidden_size)
            self.relu4 = nn.ReLU()
            self.fc2 = nn.Linear(hidden_size, 8)

        def forward(self, x):
            # Convolutional Blocks
            x = self.pool1(self.relu1(self.conv1(x)))
            x = self.pool2(self.relu2(self.conv2(x)))
            x = self.pool3(self.relu3(self.conv3(x)))
            # Fully Connected Blocks (Neural Network)
            x = x.view(x.size(0), -1)  # flatten
            x = self.relu4(self.fc1(x))
            x = self.fc2(x)
            return x

    model = VideoGasNet()

    ###############################
    # Define optimizer
    ###############################
    if optimizer_name == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    elif optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    elif optimizer_name == 'AdamW':
        optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    elif optimizer_name == 'Adadelta':
        optimizer = optim.Adadelta(model.parameters(), lr=lr, weight_decay=weight_decay)

    criterion = nn.CrossEntropyLoss()

    ##########################################
    # Create DataLoaders with trial batch_size
    ##########################################
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    ###############################
    # Train the model
    ###############################
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)


    num_epochs = 15

    model.train()
    for epoch in range(num_epochs):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    #####################
    # Evaluate the model
    #####################
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    return accuracy


## Run the Optuna Study

Now we will create an Optuna study and run the optimization process.

In [12]:
# Create a study object and specify the direction of optimization (maximize accuracy)
study = optuna.create_study(direction='maximize',
                             pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5))

# Run the optimization
study.optimize(objective, n_trials = 30)

# Print the best hyperparameters found
print("Best hyperparameters: ", study.best_params)

# Print the best accuracy found
print("Best accuracy: ", study.best_value)

# Plot the visualization
optuna.visualization.plot_param_importances(study).show()

# Run more trials
# study.optimize(objective, n_trials=20)

[I 2025-10-28 22:06:59,760] A new study created in memory with name: no-name-74c60988-a594-45d2-b94b-6b605701035f
[I 2025-10-28 22:12:50,340] Trial 0 finished with value: 0.19529609407811843 and parameters: {'lr': 4.238962886275655e-05, 'optimizer': 'RMSprop', 'momentum': 0.949880549257204, 'weight_decay': 0.004803265830591319, 'hidden_size': 142, 'batch_size': 128}. Best is trial 0 with value: 0.19529609407811843.
[I 2025-10-28 22:18:37,054] Trial 1 finished with value: 0.1495170096598068 and parameters: {'lr': 0.0010457486034836782, 'optimizer': 'AdamW', 'weight_decay': 0.00973123196913183, 'hidden_size': 143, 'batch_size': 64}. Best is trial 0 with value: 0.19529609407811843.
[I 2025-10-28 22:24:07,246] Trial 2 finished with value: 0.124317513649727 and parameters: {'lr': 0.020566244979827345, 'optimizer': 'AdamW', 'weight_decay': 0.007864086045894165, 'hidden_size': 78, 'batch_size': 32}. Best is trial 0 with value: 0.19529609407811843.
[I 2025-10-28 22:29:47,521] Trial 3 finished 

KeyboardInterrupt: 