# ESC 50
Data preparation (spectogram creation) is loosely copied from https://www.kaggle.com/code/doofensmirtz/85-validation-accuracy-tensorflow

## Importing Libraries

In [None]:
import pandas as pd
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import librosa.display
from tqdm import tqdm
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F
import os
from torchinfo import summary
import gc
from sklearn.metrics import f1_score
from tensorboard.plugins.hparams import api as hp
import optuna

from IPython.display import display
%matplotlib inline
%load_ext tensorboard

In [None]:
MODE = 'RUN'
AUGUMENTATION = True

## Loading and Preprocessing

In [None]:
CSV_FILE_PATH = "../input/environmental-sound-classification-50/esc50.csv"  # path of csv file
DATA_PATH = "../input/environmental-sound-classification-50/audio/audio/44100/" # path to folder containing audio files
PRIVATE_DATA_PATH = "../input/esc50-private/" # path to folder containing audio files

In [None]:
#reading the csv file
df = pd.read_csv(CSV_FILE_PATH).drop(['esc10','src_file','take'], axis=1)
df['filepath'] = DATA_PATH + df['filename']
display(df)

classes = df['category'].unique()
print(classes)

class_dict = {i:x for x,i in enumerate(classes)}
print(class_dict)

df['target'] = df['category'].map(class_dict)
display(df)


## Visualization

[](http://)

In [None]:
sample_df = df.drop_duplicates(subset=['target'])
print("Sample df:")
display(sample_df)

def visualize_data():
    signals = {}
    mel_spectrograms = {}

    for row in tqdm(sample_df.iterrows()):  # every row will be like [[index], [filename , target , category]]
        signal , rate = librosa.load(DATA_PATH+ row[1][0])
        signals[row[1][2]] = signal    # row[1][2] will be the category of that signal. eg. signal["dog"] = signal of dog sound

        mel_spec = librosa.feature.melspectrogram(y=signal , sr=rate ,  n_fft=2048, hop_length=512)
        mel_spec = librosa.power_to_db(mel_spec, ref=np.max)  #visualizing mel_spectrogram directly gives black image. So, coverting from power_to_db is required
        mel_spectrograms[row[1][2]] = mel_spec

    # plot signals
    fig , axes = plt.subplots(nrows=25 , ncols=2 , sharex =False ,sharey=True,
                             figsize=(40,20))
    fig.suptitle('Time series',size=15)
    i=0
    for x in range(25):
        for y in range(2):
            axes[x,y].set_title(list(signals.keys())[i])
            axes[x,y].plot(list(signals.values())[i])
            axes[x,y].get_xaxis().set_visible(False)
            axes[x,y].get_yaxis().set_visible(False)
            i +=1

    plt.show()
    plt.close(fig)
    
    # plot mel spectograms
    fig , axes = plt.subplots(nrows=25 , ncols=2 , sharex =False ,sharey=True,
                             figsize=(100,100))
    fig.suptitle('Mel spectograms',size=15)
    i=0
    for x in range(25):
        for y in range(2):
            axes[x,y].set_title(list(mel_spectrograms.keys())[i])
            axes[x,y].imshow(list(mel_spectrograms.values())[i], cmap=None,interpolation='nearest')
            axes[x,y].get_xaxis().set_visible(False)
            axes[x,y].get_yaxis().set_visible(False)
            i+=1

    plt.show()
    plt.close(fig)
    
visualize_data()

## Data augumentation

In [None]:
def add_noise(data):
    """
    Add noise to the audio signal to simulate different noise conditions.
    """
    noise = np.random.normal(0, 0.007, len(data))
    audio_noisy = data + noise
    return audio_noisy


def pitch_shifting(data):
    """
    Change the pitch of the audio signal by shifting it up or down.
    """
    bins_per_octave = 12
    pitch_pm = 2
    pitch_change = pitch_pm * 2 * (np.random.uniform())
    data = librosa.effects.pitch_shift(
        data.astype("float64"),
        sr=16000,
        n_steps=pitch_change,
        bins_per_octave=bins_per_octave,
    )
    return data


def random_shift(data):
    """
    Change the position of the audio signal in time by shifting it forwards or backwards.
    """
    timeshift_fac = 0.2 * 2 * (np.random.uniform() - 0.5)  # up to 20% of length
    start = int(data.shape[0] * timeshift_fac)
    if start > 0:
        data = np.pad(data, (start, 0), mode="constant")[0 : data.shape[0]]
    else:
        data = np.pad(data, (0, -start), mode="constant")[0 : data.shape[0]]
    return data


def volume_scaling(data):
    """
    Change the volume of the audio signal by scaling it up or down.
    """
    dyn_change = np.random.uniform(low=1.5, high=2.5)
    data = data * dyn_change
    return data


def time_stretching(data, rate=1.5):
    """
    Change the duration of the audio signal by speeding it up or slowing it down.
    """
    input_length = len(data)
    stretching = data.copy()
    stretching = librosa.effects.time_stretch(y=stretching, rate=rate)

    if len(stretching) > input_length:
        stretching = stretching[:input_length]
    else:
        stretching = np.pad(
            stretching, (0, max(0, input_length - len(stretching))), "constant"
        )
    return stretching

## Prepare datasets

In [None]:
# load private data
private_labels = []
private_mel_spectrograms = []
signals = []  # Store signals for plotting

for file in tqdm(os.listdir(PRIVATE_DATA_PATH)):
    class_name = file[:-5]
    assert class_name in class_dict.keys()
    signal, rate = librosa.load(os.path.join(PRIVATE_DATA_PATH, file), sr=22050, duration=5.0)
    mel_spec = librosa.feature.melspectrogram(y=signal, sr=rate, n_fft=2048, hop_length=512)
    mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
    private_mel_spectrograms.append(mel_spec)
    # cut X.wav from name
    private_labels.append(class_dict[class_name])
    signals.append((signal, rate))

# Find max length in time dimension of spectrograms
max_time_steps = max([mel_spec.shape[1] for mel_spec in private_mel_spectrograms])

# Pad each spectrogram to have the same time dimension
padded_private_mel_spectrograms = []
for mel_spec in private_mel_spectrograms:
    pad_width = max_time_steps - mel_spec.shape[1]
    if pad_width > 0:
        mel_spec = np.pad(mel_spec, ((0, 0), (0, pad_width)), mode='constant')
    padded_private_mel_spectrograms.append(mel_spec)

private_mel_spectrograms = padded_private_mel_spectrograms

# Plot one signal and spectrogram per class
unique_classes = list(set(private_labels))
plotted_classes = set()

plt.figure(figsize=(12, 8))
for idx, (label, signal_data) in enumerate(zip(private_labels, signals)):
    if label not in plotted_classes:
        plotted_classes.add(label)
        
        signal, rate = signal_data
        mel_spec = private_mel_spectrograms[idx]
        
        # Plot waveform
        plt.subplot(2, 1, 1)
        plt.title(f"Waveform of class {label}")
        librosa.display.waveshow(signal, sr=rate, alpha=0.75)
        plt.xlabel("Time (s)")
        plt.ylabel("Amplitude")
        
        # Plot spectrogram
        plt.subplot(2, 1, 2)
        plt.title(f"Spectrogram of class {label}")
        librosa.display.specshow(mel_spec, sr=rate, hop_length=512, x_axis='time', y_axis='mel', cmap='viridis')
        plt.colorbar(format='%+2.0f dB')
        plt.xlabel("Time (s)")
        plt.ylabel("Mel Frequency")
        
        plt.tight_layout()
        plt.show()
        
        # Break if all unique classes are plotted
        if len(plotted_classes) == len(unique_classes):
            break

In [None]:
mel_spectrograms = [[], [], [], [], []]
labels = [[], [], [], [], []]

# Compute mel spectrograms for all audio files
for index, row in tqdm(df.iterrows(), total=df.shape[0]):
    # Load audio file
    signal, rate = librosa.load(row['filepath'], sr=22050, duration=5.0)
    if AUGUMENTATION:
        noise_data = add_noise(signal)
        time_stretching_data = time_stretching(data=signal, rate=1.5)
        random_shift_data = random_shift(signal)
        volume_scale_data = volume_scaling(signal)
        samples = [signal, noise_data, time_stretching_data, random_shift_data, volume_scale_data]
    else:
        samples = [signal]
    
    for sample in samples:
        # Compute mel spectrogram
        mel_spec = librosa.feature.melspectrogram(y=sample, sr=rate, n_fft=2048, hop_length=512)
        mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
        mel_spectrograms[row['fold'] - 1].append(mel_spec)
        labels[row['fold'] - 1].append(row['target'])

test_x = np.array(mel_spectrograms[0])
test_y = np.array(labels[0])
private_test_x = np.array(private_mel_spectrograms)
private_test_y = np.array(private_labels)
val_x = np.array(mel_spectrograms[1])
val_y = np.array(labels[1])
train_x = np.array(mel_spectrograms[2] + mel_spectrograms[3] + mel_spectrograms[4])
train_y = np.array(labels[2] + labels[3] + labels[4])
final_train_x = np.array(mel_spectrograms[0] + mel_spectrograms[2] + mel_spectrograms[3] + mel_spectrograms[4])
final_train_y = np.array(labels[0] + labels[2] + labels[3] + labels[4])

print(private_test_x.shape, private_test_y.shape)
print(test_x.shape, test_y.shape)
print(val_x.shape, val_y.shape)
print(train_x.shape, train_y.shape)
print(final_train_x.shape, final_train_y.shape)

In [None]:
# LSTM Hyperparameters
HP_LSTM_HIDDEN_SIZE = hp.HParam('lstm_hidden_size', hp.IntInterval(1, 4096))
HP_LSTM_NUM_LAYERS = hp.HParam('lstm_num_layers', hp.IntInterval(1, 6))
HP_LSTM_DROPOUT = hp.HParam('lstm_dropout', hp.RealInterval(0.0, 0.7))

# CNN Hyperparameters
HP_CNN_CONV1_KERNEL_SIZE_X = hp.HParam('cnn_conv1_kernel_size_x', hp.IntInterval(2, 32))
HP_CNN_CONV1_KERNEL_SIZE_Y = hp.HParam('cnn_conv1_kernel_size_y', hp.IntInterval(2, 32))
HP_CNN_POOL1_KERNEL_SIZE_X = hp.HParam('cnn_pool1_kernel_size_x', hp.IntInterval(2, 8))
HP_CNN_POOL1_KERNEL_SIZE_Y = hp.HParam('cnn_pool1_kernel_size_y', hp.IntInterval(1, 8))
HP_CNN_CONV2_KERNEL_SIZE_X = hp.HParam('cnn_conv2_kernel_size_x', hp.IntInterval(2, 8))
HP_CNN_CONV2_KERNEL_SIZE_Y = hp.HParam('cnn_conv2_kernel_size_y', hp.IntInterval(1, 8))
HP_CNN_POOL2_KERNEL_SIZE_X = hp.HParam('cnn_pool2_kernel_size_x', hp.IntInterval(1, 4))
HP_CNN_POOL2_KERNEL_SIZE_Y = hp.HParam('cnn_pool2_kernel_size_y', hp.IntInterval(1, 4))
HP_CNN_HIDDEN_SIZE = hp.HParam('cnn_hidden_size', hp.IntInterval(1, 8192))
HP_CNN_DROPOUT_CONV1_RATE = hp.HParam('cnn_dropout_conv1_rate', hp.RealInterval(0.0, 0.7))
HP_CNN_DROPOUT_CONV2_RATE = hp.HParam('cnn_dropout_conv2_rate', hp.RealInterval(0.0, 0.7))
HP_CNN_DROPOUT_FC_RATE = hp.HParam('cnn_dropout_fc_rate', hp.RealInterval(0.0, 0.7))
HP_CNN_NUM_CHANNELS = hp.HParam('cnn_num_channels', hp.IntInterval(2, 128))

# LSTMAttentionCNN Hyperparameters
HP_LSTMCNN_HIDDEN_SIZE = hp.HParam('lstmcnn_hidden_size', hp.IntInterval(1, 1024))
HP_LSTMCNN_NUM_LAYERS = hp.HParam('lstmcnn_num_layers', hp.IntInterval(1, 2))
HP_LSTMCNN_NUM_HEADS = hp.HParam('lstmcnn_num_heads', hp.IntInterval(1, 16))
HP_LSTMCNN_CONV1_KERNEL_SIZE = hp.HParam('lstmcnn_conv1_kernel_size', hp.IntInterval(2, 32))
HP_LSTMCNN_CONV2_KERNEL_SIZE = hp.HParam('lstmcnn_conv2_kernel_size', hp.IntInterval(2, 16))
HP_LSTMCNN_POOL1_KERNEL_SIZE = hp.HParam('lstmcnn_pool1_kernel_size', hp.IntInterval(2, 8))
HP_LSTMCNN_POOL2_KERNEL_SIZE = hp.HParam('lstmcnn_pool2_kernel_size', hp.IntInterval(2, 8))
HP_LSTMCNN_DROPOUT_LSTM_RATE = hp.HParam('lstmcnn_dropout_lstm_rate', hp.RealInterval(0.0, 0.7))
HP_LSTMCNN_DROPOUT_ATTN_RATE = hp.HParam('lstmcnn_dropout_attn_rate', hp.RealInterval(0.0, 0.7))
HP_LSTMCNN_DROPOUT_CONV1_RATE = hp.HParam('lstmcnn_dropout_conv1_rate', hp.RealInterval(0.0, 0.7))
HP_LSTMCNN_DROPOUT_CONV2_RATE = hp.HParam('lstmcnn_dropout_conv2_rate', hp.RealInterval(0.0, 0.7))
HP_LSTMCNN_DROPOUT_FC_RATE = hp.HParam('lstmcnn_dropout_fc_rate', hp.RealInterval(0.0, 0.7))
HP_LSTMCNN_CNN_OUT_CHANNELS = hp.HParam('lstmcnn_cnn_out_channels', hp.IntInterval(2, 128))
HP_LSTMCNN_FC_HIDDEN_SIZE = hp.HParam('lstmcnn_fc_hidden_size', hp.IntInterval(16, 8192))



## Initialize hyperparameters

In [None]:


'''
Best hyperparameters for LSTMCNN:
{
'hidden_size': 2654,
'num_layers': 1,
'num_heads': 6,
'conv1_kernel_size': 15,
'conv2_kernel_size': 6,
'pool1_kernel_size': 2,
'pool2_kernel_size': 4,
'dropout_lstm_rate': 0.6085469197951502,
'dropout_attn_rate': 0.18886736927751804,
'dropout_conv1_rate': 0.3129152390777908,
'dropout_conv2_rate': 0.30551219428129983,
'dropout_fc_rate': 0.5657045077939624,
'cnn_out_channels': 38,
'fc_hidden_size': 571,
'learning_rate': 0.0001124180393666018,
'weight_decay': 0.0014236035034121022
}
Val Acc: 0.4775, Val F1: 0.4626
'''

'''
Best hyperparameters for CNN:
{
'conv1_kernel_size_x': 27,
'conv1_kernel_size_y': 15,
'conv2_kernel_size_x': 2,
'conv2_kernel_size_y': 3,
'pool1_kernel_size_x': 5,
'pool1_kernel_size_y': 7,
'pool2_kernel_size_x': 3,
'pool2_kernel_size_y': 2,
'hidden_size': 6299,
'dropout_conv1_rate': 0.011614705225163842,
'dropout_conv2_rate': 0.02490196806259639,
'dropout_fc_rate': 0.5574060256228773,
'num_channels': 100,
'learning_rate': 0.0001182121358018495,
'weight_decay': 0.000978912231799175
}
Val Acc: 0.5175, Val F1: 0.5078
'''

'''
{
'conv1_kernel_size_x': 14,
'conv1_kernel_size_y': 6,
'conv2_kernel_size_x': 2,
'conv2_kernel_size_y': 5,
'pool1_kernel_size_x': 5,
'pool1_kernel_size_y': 8,
'pool2_kernel_size_x': 3,
'pool2_kernel_size_y': 4,
'hidden_size': 6090,
'dropout_conv1_rate': 0.007385545905756266,
'dropout_conv2_rate': 0.3713678969520621,
'dropout_fc_rate': 0.5753897102755045,
'num_channels': 90,
'learning_rate': 0.00015169134548009995,
'weight_decay': 0.009258909446463159
}
Val Acc: 0.6030, Val F1: 0.5895
'''

default_hparams = {
    HP_CNN_CONV1_KERNEL_SIZE_X: 14,
    HP_CNN_CONV1_KERNEL_SIZE_Y: 6,
    HP_CNN_POOL1_KERNEL_SIZE_X: 2,
    HP_CNN_POOL1_KERNEL_SIZE_Y: 5,
    HP_CNN_CONV2_KERNEL_SIZE_X: 5,
    HP_CNN_CONV2_KERNEL_SIZE_Y: 8,
    HP_CNN_POOL2_KERNEL_SIZE_X: 3,
    HP_CNN_POOL2_KERNEL_SIZE_Y: 4,
    HP_CNN_HIDDEN_SIZE: 6090,
    HP_CNN_DROPOUT_CONV1_RATE: 0.007385545905756266,
    HP_CNN_DROPOUT_CONV2_RATE: 0.3713678969520621,
    HP_CNN_DROPOUT_FC_RATE: 0.5753897102755045,
    HP_CNN_NUM_CHANNELS: 90,

    HP_LSTMCNN_HIDDEN_SIZE: 3000,
    HP_LSTMCNN_NUM_LAYERS: 1,
    HP_LSTMCNN_NUM_HEADS: 6,
    HP_LSTMCNN_CONV1_KERNEL_SIZE: 15,
    HP_LSTMCNN_CONV2_KERNEL_SIZE: 6,
    HP_LSTMCNN_POOL1_KERNEL_SIZE: 2,
    HP_LSTMCNN_POOL2_KERNEL_SIZE: 4,
    HP_LSTMCNN_DROPOUT_LSTM_RATE: 0.6,
    HP_LSTMCNN_DROPOUT_ATTN_RATE: 0.2,
    HP_LSTMCNN_DROPOUT_CONV1_RATE: 0.3,
    HP_LSTMCNN_DROPOUT_CONV2_RATE: 0.3,
    HP_LSTMCNN_DROPOUT_FC_RATE: 0.55,
    HP_LSTMCNN_CNN_OUT_CHANNELS: 40,
    HP_LSTMCNN_FC_HIDDEN_SIZE: 600, 

    HP_LSTM_HIDDEN_SIZE: 2048,
    HP_LSTM_NUM_LAYERS: 2,
    HP_LSTM_DROPOUT: 0.1,
}

In [None]:
def create_loaders(model_type, batch_size, train_x, train_y, val_x, val_y, test_x, test_y):
    if model_type == 'LSTM'or model_type == 'LSTMCNN' :
        # Transpose X to match LSTM input requirements (batch_size, seq_length, input_size)
        X_train = train_x.transpose(0, 2, 1)
        X_val = val_x.transpose(0, 2, 1)
        X_test = test_x.transpose(0, 2, 1)
    elif model_type == 'CNN':
        # For CNN, reshape to (n_samples, channels, height, width)
        X_train = train_x[:, np.newaxis, :, :]
        X_val = val_x[:, np.newaxis, :, :]
        X_test = test_x[:, np.newaxis, :, :]
    else:
        raise ValueError(f"Unknown model_type: {model_type}")

    y_train = train_y
    y_val = val_y
    y_test = test_y

    print("Numpy Train dataset shapes:", X_train.shape, y_train.shape)
    print("Numpy Val dataset shapes:", X_val.shape, y_val.shape)
    print("Numpy Test dataset shapes:", X_test.shape, y_test.shape)

    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    # Create TensorDatasets
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    # Print shapes of the tensors
    print("X_train_tensor shape:", train_dataset.tensors[0].shape)
    print("y_train_tensor shape:", train_dataset.tensors[1].shape)
    print("X_val_tensor shape:", val_dataset.tensors[0].shape)
    print("y_val_tensor shape:", val_dataset.tensors[1].shape)
    print("X_test_tensor shape:", test_dataset.tensors[0].shape)
    print("y_test_tensor shape:", test_dataset.tensors[1].shape)

    # Print number of samples
    print("Number of samples in train_dataset:", len(train_dataset))
    print("Number of samples in val_dataset:", len(val_dataset))
    print("Number of samples in test_dataset:", len(test_dataset))

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    # Print batch size
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        print(f"Batch {batch_idx + 1} - inputs shape: {inputs.shape}, labels shape: {labels.shape}")
        single_batch_size = inputs.shape
        break
    
    return single_batch_size, train_loader, val_loader, test_loader

In [None]:
def create_final_loaders(model_type, batch_size, train_x, train_y, val_x, val_y):
    if model_type == 'LSTM'or model_type == 'LSTMCNN' :
        # Transpose X to match LSTM input requirements (batch_size, seq_length, input_size)
        X_train = train_x.transpose(0, 2, 1)
        X_val = val_x.transpose(0, 2, 1)
    elif model_type == 'CNN':
        # For CNN, reshape to (n_samples, channels, height, width)
        X_train = train_x[:, np.newaxis, :, :]
        X_val = val_x[:, np.newaxis, :, :]
    else:
        raise ValueError(f"Unknown model_type: {model_type}")

    y_train = train_y
    y_val = val_y

    print("Numpy Train dataset shapes:", X_train.shape, y_train.shape)
    print("Numpy Val dataset shapes:", X_val.shape, y_val.shape)

    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long)

    # Create TensorDatasets
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

    # Print shapes of the tensors
    print("X_train_tensor shape:", train_dataset.tensors[0].shape)
    print("y_train_tensor shape:", train_dataset.tensors[1].shape)
    print("X_val_tensor shape:", val_dataset.tensors[0].shape)
    print("y_val_tensor shape:", val_dataset.tensors[1].shape)

    # Print number of samples
    print("Number of samples in train_dataset:", len(train_dataset))
    print("Number of samples in val_dataset:", len(val_dataset))

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Print batch size
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        print(f"Batch {batch_idx + 1} - inputs shape: {inputs.shape}, labels shape: {labels.shape}")
        single_batch_size = inputs.shape
        break
    
    return single_batch_size, train_loader, val_loader

In [None]:
def create_private_loader(model_type, batch_size, test_x, test_y):
    if model_type == 'LSTM'or model_type == 'LSTMCNN' :
        # Transpose X to match LSTM input requirements (batch_size, seq_length, input_size)
        X_test = test_x.transpose(0, 2, 1)
    elif model_type == 'CNN':
        # For CNN, reshape to (n_samples, channels, height, width)
        X_test = test_x[:, np.newaxis, :, :]
    else:
        raise ValueError(f"Unknown model_type: {model_type}")

    y_test = test_y

    print("Numpy Private test dataset shapes:", X_test.shape, y_test.shape)

    # Convert data to PyTorch tensors
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    # Create TensorDatasets
    private_test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    # Print shapes of the tensors
    print("Private X_test_tensor shape:", private_test_dataset.tensors[0].shape)
    print("Private y_test_tensor shape:", private_test_dataset.tensors[1].shape)

    # Print number of samples
    print("Number of samples in private_test_dataset:", len(private_test_dataset))

    # Create DataLoaders
    private_test_loader = DataLoader(private_test_dataset, batch_size=batch_size)
    
    return private_test_loader

## Define models

In [None]:
def conv_output_size(input_size, kernel_size, stride, padding=0):
    return (input_size - kernel_size + 2 * padding) // stride + 1

def pool_output_size(input_size, kernel_size, stride, padding=0):
    return (input_size - kernel_size + 2 * padding) // stride + 1

In [None]:
class SimpleLSTM(nn.Module):
    def __init__(self, input_size, num_classes, hparams):
        super(SimpleLSTM, self).__init__()
        
        self.hparams = hparams
        
        # Extract hyperparameters from hparams
        hidden_size = hparams[HP_LSTM_HIDDEN_SIZE]
        num_layers = hparams[HP_LSTM_NUM_LAYERS]
        dropout = hparams[HP_LSTM_DROPOUT]
        
        self.description = (
        f"LSTM(input_size={input_size}, num_classes={num_classes}, hidden_size={hidden_size}, num_layers={num_layers}, dropout={dropout})"
        )
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x: (batch_size, seq_length, input_size)
        out, (h_n, c_n) = self.lstm(x)  # out: (batch_size, seq_length, hidden_size)
        out = out[:, -1, :]  # Get the output of the last time step
        out = self.fc(out)  # out: (batch_size, num_classes)
        return out


In [None]:
class LSTMAttentionCNN(nn.Module):
    def __init__(
        self,
        input_size,
        seq_length,
        num_classes,
        hparams
    ):
        super(LSTMAttentionCNN, self).__init__()
        
        self.hparams = hparams

        num_layers = hparams[HP_LSTMCNN_NUM_LAYERS]
        num_heads = hparams[HP_LSTMCNN_NUM_HEADS]
        # attention embed_dim needs to be divisible by num_heads, so adjust it to match
        hidden_size = hparams[HP_LSTMCNN_HIDDEN_SIZE] - hparams[HP_LSTMCNN_HIDDEN_SIZE] % num_heads 
        conv1_kernel_size = hparams[HP_LSTMCNN_CONV1_KERNEL_SIZE]
        conv2_kernel_size = hparams[HP_LSTMCNN_CONV2_KERNEL_SIZE]
        pool1_kernel_size = hparams[HP_LSTMCNN_POOL1_KERNEL_SIZE]
        pool2_kernel_size = hparams[HP_LSTMCNN_POOL2_KERNEL_SIZE]
        dropout_lstm_rate = hparams[HP_LSTMCNN_DROPOUT_LSTM_RATE]
        dropout_attn_rate = hparams[HP_LSTMCNN_DROPOUT_ATTN_RATE]
        dropout_conv1_rate = hparams[HP_LSTMCNN_DROPOUT_CONV1_RATE]
        dropout_conv2_rate = hparams[HP_LSTMCNN_DROPOUT_CONV2_RATE]
        dropout_fc_rate = hparams[HP_LSTMCNN_DROPOUT_FC_RATE]
        cnn_out_channels = hparams[HP_LSTMCNN_CNN_OUT_CHANNELS]
        fc_hidden_size = hparams[HP_LSTMCNN_FC_HIDDEN_SIZE]
        
        self.description = (
        f"LSTMAttentionCNN(input_size={input_size}, seq_length={seq_length}, "
        f"hidden_size={hidden_size}, num_layers={num_layers}, num_heads={num_heads}, "
        f"conv1_kernel_size={conv1_kernel_size}, conv2_kernel_size={conv2_kernel_size}, "
        f"pool1_kernel_size={pool1_kernel_size}, pool2_kernel_size={pool2_kernel_size}, "
        f"dropout_lstm_rate={dropout_lstm_rate}, dropout_attn_rate={dropout_attn_rate}, "
        f"dropout_conv1_rate={dropout_conv1_rate}, dropout_conv2_rate={dropout_conv2_rate}, "
        f"dropout_fc_rate={dropout_fc_rate}, cnn_out_channels={cnn_out_channels}, "
        f"fc_hidden_size={fc_hidden_size})"
        )
    
        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout = dropout_lstm_rate
        )
        
        # Dropout after LSTM
        self.dropout_lstm = nn.Dropout(dropout_lstm_rate)

        # MultiHeadAttention layer
        self.attention = nn.MultiheadAttention(
            embed_dim=hidden_size,
            num_heads=num_heads,
            batch_first=True
        )

        # Dropout after attention
        self.dropout_attention = nn.Dropout(dropout_attn_rate)

        # CNN layers using Conv1d
        self.conv1 = nn.Conv1d(
            in_channels=hidden_size,
            out_channels=cnn_out_channels,
            kernel_size=conv1_kernel_size
        )
        self.dropout_conv1 = nn.Dropout(dropout_conv1_rate)

        self.pool1 = nn.MaxPool1d(kernel_size=pool1_kernel_size, stride=pool1_kernel_size)

        self.conv2 = nn.Conv1d(
            in_channels=cnn_out_channels,
            out_channels=cnn_out_channels,
            kernel_size=conv2_kernel_size
        )

        self.dropout_conv2 = nn.Dropout(dropout_conv2_rate)

        self.pool2 = nn.MaxPool1d(kernel_size=pool2_kernel_size, stride=pool2_kernel_size)

        length = seq_length

        # After conv1
        length = conv_output_size(length, conv1_kernel_size, stride=1)

        # After pool1
        length = pool_output_size(length, pool1_kernel_size, stride=pool1_kernel_size)

        # After conv2
        length = conv_output_size(length, conv2_kernel_size, stride=1)

        # After pool2
        length = pool_output_size(length, pool2_kernel_size, stride=pool2_kernel_size)

        if length <= 0:
            print(f'length:{length}')
            raise ValueError("Negative or zero dimension size after convolution/pooling layers. Adjust your hyperparameters.")

        num_features = cnn_out_channels * length

        # Fully connected layers
        self.fc1 = nn.Linear(num_features, fc_hidden_size)
        self.dropout_fc = nn.Dropout(dropout_fc_rate)
        self.fc2 = nn.Linear(fc_hidden_size, num_classes)

    def forward(self, x):
        # LSTM layer
        x, _ = self.lstm(x)
        x = self.dropout_lstm(x)

        # MultiHeadAttention layer
        x, _ = self.attention(x, x, x)
        x = self.dropout_attention(x)

        # Prepare data for Conv1d
        # Transpose to (batch_size, hidden_size, seq_length) for Conv1d
        x = x.transpose(1, 2)

        # CNN layers
        x = F.relu(x)
        x = self.conv1(x)
        x = self.dropout_conv1(x)
        x = self.pool1(x)

        x = F.relu(x)
        x = self.conv2(x)
        x = self.dropout_conv2(x)
        x = self.pool2(x)

        # Flatten
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = F.relu(x)
        x = self.fc1(x)
        x = self.dropout_fc(x)
        x = self.fc2(x)

        # return x
        return F.softmax(x, dim=1)

In [None]:
class CustomCNN(nn.Module):
    def __init__(
        self,
        num_classes,
        input_height,
        input_width,
        hparams
    ):
        super(CustomCNN, self).__init__()
        
        self.hparams = hparams

        conv1_kernel_size_x = hparams[HP_CNN_CONV1_KERNEL_SIZE_X]
        conv1_kernel_size_y = hparams[HP_CNN_CONV1_KERNEL_SIZE_Y]
        conv2_kernel_size_x = hparams[HP_CNN_CONV2_KERNEL_SIZE_X]
        conv2_kernel_size_y = hparams[HP_CNN_CONV2_KERNEL_SIZE_Y]
        pool1_kernel_size_x = hparams[HP_CNN_POOL1_KERNEL_SIZE_X]
        pool1_kernel_size_y = hparams[HP_CNN_POOL1_KERNEL_SIZE_Y]
        pool2_kernel_size_x = hparams[HP_CNN_POOL2_KERNEL_SIZE_X]
        pool2_kernel_size_y = hparams[HP_CNN_POOL2_KERNEL_SIZE_Y]
        hidden_size = hparams[HP_CNN_HIDDEN_SIZE]
        dropout_conv1_rate = hparams[HP_CNN_DROPOUT_CONV1_RATE]
        dropout_conv2_rate = hparams[HP_CNN_DROPOUT_CONV2_RATE]
        dropout_fc_rate = hparams[HP_CNN_DROPOUT_FC_RATE]
        num_channels = hparams[HP_CNN_NUM_CHANNELS]
        
        self.description = (
        f"CNN(num_classes={num_classes}, input_height={input_height}, input_width={input_width}, "
        f"hidden_size={hidden_size}, dropout_conv1_rate={dropout_conv1_rate}, "
        f"conv1_kernel_size=({conv1_kernel_size_x}, {conv1_kernel_size_y}), conv2_kernel_size=({conv2_kernel_size_x}, {conv2_kernel_size_y}), "
        f"pool1_kernel_size=({pool1_kernel_size_x}, {pool1_kernel_size_y}), pool2_kernel_size=({pool2_kernel_size_x}, {pool2_kernel_size_y}), "
        f"dropout_conv2_rate={dropout_conv2_rate}, dropout_fc_rate={dropout_fc_rate}, "
        f"num_channels={num_channels})"
        )
        
        # First convolutional layer
        self.conv1 = nn.Conv2d(
            in_channels=1,
            out_channels=num_channels,
            kernel_size=(conv1_kernel_size_x, conv1_kernel_size_y)
        )

        self.dropout_conv1 = nn.Dropout2d(dropout_conv1_rate)

        self.pool1 = nn.MaxPool2d(
            kernel_size=(pool1_kernel_size_x, pool1_kernel_size_y),
            stride=(pool1_kernel_size_x, pool1_kernel_size_y)
        )

        self.conv2 = nn.Conv2d(
            in_channels=num_channels,
            out_channels=num_channels,
            kernel_size=(conv2_kernel_size_x, conv2_kernel_size_y)
        )

        self.dropout_conv2 = nn.Dropout2d(dropout_conv2_rate)

        self.pool2 = nn.MaxPool2d(
            kernel_size=(pool2_kernel_size_x, pool2_kernel_size_y),
            stride=(pool2_kernel_size_x, pool2_kernel_size_y)
        )

        # Calculate the size after convolutions and pooling
        # Compute output dimensions step by step
        H, W = input_height, input_width

        # After conv1
        H = conv_output_size(H, conv1_kernel_size_x, stride=1)
        W = conv_output_size(W, conv1_kernel_size_y, stride=1)

        # After pool1
        H = pool_output_size(H, pool1_kernel_size_x, stride=pool1_kernel_size_x)
        W = pool_output_size(W, pool1_kernel_size_y, stride=pool1_kernel_size_y)

        # After conv2
        H = conv_output_size(H, conv2_kernel_size_x, stride=1)
        W = conv_output_size(W, conv2_kernel_size_y, stride=1)

        # After pool2
        H = pool_output_size(H, pool2_kernel_size_x, stride=pool2_kernel_size_x)
        W = pool_output_size(W, pool2_kernel_size_y, stride=pool2_kernel_size_y)

        if H <= 0 or W <= 0:
            print(f'h:{H}, w:{W}')
            raise ValueError("Negative or zero dimension size after convolution/pooling layers. "
                             "Adjust your hyperparameters.")

        # Compute the number of features for the first fully connected layer
        num_features = num_channels * H * W

        # Fully connected layers
        self.fc1 = nn.Linear(num_features, hidden_size)
        self.dropout_fc = nn.Dropout(dropout_fc_rate)
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Convolutional layer 1
        x = self.conv1(x)
        x = F.relu(x)
        x = self.dropout_conv1(x)
        x = self.pool1(x)

        # Convolutional layer 2
        x = self.conv2(x)
        x = F.relu(x)
        x = self.dropout_conv2(x)
        x = self.pool2(x)

        # Flatten
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout_fc(x)
        x = self.fc2(x)

        return x

## Define model creation and training

In [None]:
def create_lstm(input_size, num_classes, hparams, device):
    model = SimpleLSTM(
        input_size=input_size,
        num_classes=num_classes,
        hparams=default_hparams | hparams
    ).to(device)
    return model

def create_cnn(
    num_classes,
    input_height,
    input_width,
    hparams,
    device
):
    model = CustomCNN(
        num_classes=num_classes,
        input_height=input_height,
        input_width=input_width,
        hparams=default_hparams | hparams
    ).to(device)

    return model

def create_lstmcnn(
    input_size,
    seq_length,
    num_classes,
    hparams,
    device
):
    model = LSTMAttentionCNN(
        input_size=input_size,
        seq_length=seq_length,
        num_classes=num_classes,
        hparams=default_hparams | hparams
    ).to(device)
    
    return model

def create_sample_model(model_type, single_batch_size, num_classes, device):
    if model_type == 'LSTM':
        
        # Define hyperparameters for LSTM
        input_size = single_batch_size[2]

        # Create LSTM model
        model = create_lstm(
            input_size=input_size,
            num_classes=num_classes,
            hparams=default_hparams,
            device=device
        )

    elif model_type == 'CNN':
        # Define hyperparameters for CNN
        input_height = single_batch_size[2]
        input_width = single_batch_size[3]

        # Create CNN model
        model = create_cnn(
            num_classes=num_classes,
            input_height=input_height,
            input_width=input_width,
            hparams=default_hparams,
            device=device
        )

    elif model_type == 'LSTMCNN':
        # Define hyperparameters for LSTMAttentionCNN
        input_size = single_batch_size[2]
        seq_length = single_batch_size[1]

        # Create LSTMAttentionCNN model
        model = create_lstmcnn(
            input_size=input_size,
            seq_length=seq_length,
            num_classes=num_classes,
            hparams=default_hparams,
            device=device
        )

    else:
        raise ValueError(f"Unknown model_type: {model_type}")
        
    display(summary(model, input_size=single_batch_size))
    return model


In [None]:
def train_model(
    model,
    train_loader,
    val_loader,
    num_epochs=200,
    learning_rate=0.00002,
    save_best_model=False,
    best_model_path='best_model.pth',
    weight_decay=0.0,
    log_dir=None,
    patience=100,
    device=None
):      
    # Initialize loss function
    criterion = nn.CrossEntropyLoss()

    # Initialize optimizer with optional weight decay
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    # Initialize TensorBoard writer if log_dir is specified
    if log_dir is not None:
        writer = SummaryWriter(log_dir=log_dir)

    best_val_f1 = 0.0   # For saving the best model and early stopping
    best_val_acc = 0.0
    epochs_without_improvement = 0  # Counter for early stopping

    # Training loop
    print("Starting training...")
    print(model.description)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        all_labels = []
        all_predictions = []

        for inputs, labels in train_loader:
            # Move inputs and labels to device
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

        # Calculate average loss, accuracy, and F1 score for the epoch
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = correct_predictions / len(train_loader.dataset)
        epoch_f1 = f1_score(all_labels, all_predictions, average='macro')

        # Validation phase
        model.eval()
        val_running_loss = 0.0
        val_correct_predictions = 0
        val_all_labels = []
        val_all_predictions = []

        with torch.no_grad():
            for inputs, labels in val_loader:
                # Move inputs and labels to device
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_running_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_correct_predictions += (predicted == labels).sum().item()
                val_all_labels.extend(labels.cpu().numpy())
                val_all_predictions.extend(predicted.cpu().numpy())

        val_loss = val_running_loss / len(val_loader.dataset)
        val_acc = val_correct_predictions / len(val_loader.dataset)
        val_f1 = f1_score(val_all_labels, val_all_predictions, average='macro')

        # Print epoch stats
        print(
            f"Epoch [{epoch + 1}/{num_epochs}] "
            f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, Train F1: {epoch_f1:.4f} "
            f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, Val F1: {val_f1:.4f}"
        )

        # Save the best model if specified
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_val_acc = val_acc
            epochs_without_improvement = 0  # Reset the counter
            if save_best_model:
                torch.save(model.state_dict(), best_model_path)
                print(f"Best model saved with val_f1: {val_f1:.4f}")
        else:
            epochs_without_improvement += 1
        
        # Log to TensorBoard
        if log_dir is not None:
            writer.add_scalar('Loss/train', epoch_loss, epoch)
            writer.add_scalar('Accuracy/train', epoch_acc, epoch)
            writer.add_scalar('F1/train', epoch_f1, epoch)
            writer.add_scalar('Loss/val', val_loss, epoch)
            writer.add_scalar('Accuracy/val', val_acc, epoch)
            writer.add_scalar('F1/val', val_f1, epoch)
            # Log learning rate
            current_lr = optimizer.param_groups[0]['lr']
            writer.add_scalar('Learning Rate', current_lr, epoch)

        if epochs_without_improvement >= patience:
            print(f"Early stopping triggered after {patience} epochs without improvement.")
            break

    if log_dir is not None:
        # Log hparams
        hparam_metrics = {
            'hparam/val_acc': best_val_acc,
            'hparam/val_f1': best_val_f1
        }
        writer.add_hparams({str(k.name): v for k, v in model.hparams.items()}, hparam_metrics)
        writer.close()
    print("Training completed.")
    return best_val_f1

## Run hyperparameter search

In [None]:
BATCH_SIZE_REDUCTION_ATTEMPTS = 8
BATCH_SIZE = 256
WEIGHT_DECAY_MIN = 0
WEIGHT_DECAY_MAX = 1e-2
LR_MIN = 1e-5
LR_MAX = 0.001
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


def objective(trial, function):
    batch_size = BATCH_SIZE
    
    for _ in range(BATCH_SIZE_REDUCTION_ATTEMPTS):
        try:
            return function(batch_size, trial)
        except RuntimeError as e:
            if 'out of memory' in str(e).lower():
                print(f"Out of memory error encountered. Reducing batch size from {batch_size} to {batch_size // 2}.")
                torch.cuda.empty_cache()
                batch_size = batch_size // 2
            else:
                # Re-raise any other RuntimeError exceptions
                raise e

    print("Not enough memory!")
    return 0
    

def lstm_iteration(batch_size, trial):
    model_type = 'LSTM'
    gc.collect()
    torch.cuda.empty_cache()
    print(f'\n\nStarting iteration with batch_size={batch_size}\n\n')
    single_batch_size, train_loader, val_loader, test_loader = create_loaders(model_type, batch_size, train_x, train_y, val_x, val_y, test_x, test_y)

    hidden_size = trial.suggest_int(
        "lstm_hidden_size",
        HP_LSTM_HIDDEN_SIZE.domain.min_value,
        HP_LSTM_HIDDEN_SIZE.domain.max_value
    )
    num_layers = trial.suggest_int(
        "lstm_num_layers",
        HP_LSTM_NUM_LAYERS.domain.min_value,
        HP_LSTM_NUM_LAYERS.domain.max_value
    )
    dropout = trial.suggest_float(
        "lstm_dropout",
        HP_LSTM_DROPOUT.domain.min_value,
        HP_LSTM_DROPOUT.domain.max_value
    )

    learning_rate = trial.suggest_float("learning_rate", LR_MIN, LR_MAX)
    weight_decay = trial.suggest_float("weight_decay", WEIGHT_DECAY_MIN, WEIGHT_DECAY_MAX)
    # Define hyperparameters
    hparams = {
        HP_LSTM_HIDDEN_SIZE: hidden_size,
        HP_LSTM_NUM_LAYERS: num_layers,
        HP_LSTM_DROPOUT: dropout
    }

    input_size = single_batch_size[2]

    model = create_lstm(input_size, len(classes), hparams, device)
    f1 = train_model(
        model,
        train_loader,
        val_loader,
        num_epochs=200,
        learning_rate=learning_rate,
        save_best_model=False,
        weight_decay=weight_decay,
        log_dir='logs_lstm',
        device=device
    )

    return f1

def objective_lstm(trial):
    return objective(trial, lstm_iteration)
    
    
def cnn_iteration(batch_size, trial):
    model_type = 'CNN'
    gc.collect()
    torch.cuda.empty_cache()
    print(f'\n\nStarting iteration with batch_size={batch_size}\n\n')
    single_batch_size, train_loader, val_loader, test_loader = create_loaders(model_type, batch_size, train_x, train_y, val_x, val_y, test_x, test_y)

    conv1_kernel_size_x = trial.suggest_int(
        "conv1_kernel_size_x",
        HP_CNN_CONV1_KERNEL_SIZE_X.domain.min_value,
        HP_CNN_CONV1_KERNEL_SIZE_X.domain.max_value,
    )
    conv1_kernel_size_y = trial.suggest_int(
        "conv1_kernel_size_y",
        HP_CNN_CONV1_KERNEL_SIZE_Y.domain.min_value,
        HP_CNN_CONV1_KERNEL_SIZE_Y.domain.max_value,
    )
    conv2_kernel_size_x = trial.suggest_int(
        "conv2_kernel_size_x",
        HP_CNN_CONV2_KERNEL_SIZE_X.domain.min_value,
        HP_CNN_CONV2_KERNEL_SIZE_X.domain.max_value,
    )
    conv2_kernel_size_y = trial.suggest_int(
        "conv2_kernel_size_y",
        HP_CNN_CONV2_KERNEL_SIZE_Y.domain.min_value,
        HP_CNN_CONV2_KERNEL_SIZE_Y.domain.max_value,
    )
    pool1_kernel_size_x = trial.suggest_int(
        "pool1_kernel_size_x",
        HP_CNN_POOL1_KERNEL_SIZE_X.domain.min_value,
        HP_CNN_POOL1_KERNEL_SIZE_X.domain.max_value,
    )
    pool1_kernel_size_y = trial.suggest_int(
        "pool1_kernel_size_y",
        HP_CNN_POOL1_KERNEL_SIZE_Y.domain.min_value,
        HP_CNN_POOL1_KERNEL_SIZE_Y.domain.max_value,
    )
    pool2_kernel_size_x = trial.suggest_int(
        "pool2_kernel_size_x",
        HP_CNN_POOL2_KERNEL_SIZE_X.domain.min_value,
        HP_CNN_POOL2_KERNEL_SIZE_X.domain.max_value,
    )
    pool2_kernel_size_y = trial.suggest_int(
        "pool2_kernel_size_y",
        HP_CNN_POOL2_KERNEL_SIZE_Y.domain.min_value,
        HP_CNN_POOL2_KERNEL_SIZE_Y.domain.max_value,
    )
    hidden_size = trial.suggest_int(
        "hidden_size",
        HP_CNN_HIDDEN_SIZE.domain.min_value,
        HP_CNN_HIDDEN_SIZE.domain.max_value,
    )
    dropout_conv1_rate = trial.suggest_float(
        "dropout_conv1_rate",
        HP_CNN_DROPOUT_CONV1_RATE.domain.min_value,
        HP_CNN_DROPOUT_CONV1_RATE.domain.max_value,
    )
    dropout_conv2_rate = trial.suggest_float(
        "dropout_conv2_rate",
        HP_CNN_DROPOUT_CONV2_RATE.domain.min_value,
        HP_CNN_DROPOUT_CONV2_RATE.domain.max_value,
    )
    dropout_fc_rate = trial.suggest_float(
        "dropout_fc_rate",
        HP_CNN_DROPOUT_FC_RATE.domain.min_value,
        HP_CNN_DROPOUT_FC_RATE.domain.max_value,
    )
    num_channels = trial.suggest_int(
        "num_channels",
        HP_CNN_NUM_CHANNELS.domain.min_value,
        HP_CNN_NUM_CHANNELS.domain.max_value,
    )

    learning_rate = trial.suggest_float("learning_rate", LR_MIN, LR_MAX)
    weight_decay = trial.suggest_float("weight_decay", WEIGHT_DECAY_MIN, WEIGHT_DECAY_MAX)
    # Define hyperparameters
    hparams = {
        HP_CNN_CONV1_KERNEL_SIZE_X: conv1_kernel_size_x,
        HP_CNN_CONV1_KERNEL_SIZE_Y: conv1_kernel_size_y,
        HP_CNN_CONV2_KERNEL_SIZE_X: conv2_kernel_size_x,
        HP_CNN_CONV2_KERNEL_SIZE_Y: conv2_kernel_size_y,
        HP_CNN_POOL1_KERNEL_SIZE_X: pool1_kernel_size_x,
        HP_CNN_POOL1_KERNEL_SIZE_Y: pool1_kernel_size_y,
        HP_CNN_POOL2_KERNEL_SIZE_X: pool2_kernel_size_x,
        HP_CNN_POOL2_KERNEL_SIZE_Y: pool2_kernel_size_y,
        HP_CNN_HIDDEN_SIZE: hidden_size,
        HP_CNN_DROPOUT_CONV1_RATE: dropout_conv1_rate,
        HP_CNN_DROPOUT_CONV2_RATE: dropout_conv2_rate,
        HP_CNN_DROPOUT_FC_RATE: dropout_fc_rate,
        HP_CNN_NUM_CHANNELS: num_channels,
    }
    
    input_height = single_batch_size[2]
    input_width = single_batch_size[3]
    
    model = create_cnn(len(classes), input_height, input_width, hparams, device)
    f1 = train_model(
        model,
        train_loader,
        val_loader,
        num_epochs=200,
        learning_rate=learning_rate,
        save_best_model=False,
        weight_decay=weight_decay,
        log_dir='logs_cnn',
        device=device
    )

    return f1

def objective_cnn(trial):
    return objective(trial, cnn_iteration)
    
    
def lstmcnn_iteration(batch_size, trial):
    model_type = 'LSTMCNN'
    gc.collect()
    torch.cuda.empty_cache()
    print(f'\n\nStarting iteration with batch_size={batch_size}\n\n')
    single_batch_size, train_loader, val_loader, test_loader = create_loaders(model_type, batch_size, train_x, train_y, val_x, val_y, test_x, test_y)

    hidden_size = trial.suggest_int(
        "hidden_size",
        HP_LSTMCNN_HIDDEN_SIZE.domain.min_value,
        HP_LSTMCNN_HIDDEN_SIZE.domain.max_value,
    )
    num_layers = trial.suggest_int(
        "num_layers",
        HP_LSTMCNN_NUM_LAYERS.domain.min_value,
        HP_LSTMCNN_NUM_LAYERS.domain.max_value,
    )
    num_heads = trial.suggest_int(
        "num_heads",
        HP_LSTMCNN_NUM_HEADS.domain.min_value,
        HP_LSTMCNN_NUM_HEADS.domain.max_value,
    )
    conv1_kernel_size = trial.suggest_int(
        "conv1_kernel_size",
        HP_LSTMCNN_CONV1_KERNEL_SIZE.domain.min_value,
        HP_LSTMCNN_CONV1_KERNEL_SIZE.domain.max_value,
    )
    conv2_kernel_size = trial.suggest_int(
        "conv2_kernel_size",
        HP_LSTMCNN_CONV2_KERNEL_SIZE.domain.min_value,
        HP_LSTMCNN_CONV2_KERNEL_SIZE.domain.max_value,
    )
    pool1_kernel_size = trial.suggest_int(
        "pool1_kernel_size",
        HP_LSTMCNN_POOL1_KERNEL_SIZE.domain.min_value,
        HP_LSTMCNN_POOL1_KERNEL_SIZE.domain.max_value,
    )
    pool2_kernel_size = trial.suggest_int(
        "pool2_kernel_size",
        HP_LSTMCNN_POOL2_KERNEL_SIZE.domain.min_value,
        HP_LSTMCNN_POOL2_KERNEL_SIZE.domain.max_value,
    )
    dropout_lstm_rate = trial.suggest_float(
        "dropout_lstm_rate",
        HP_LSTMCNN_DROPOUT_LSTM_RATE.domain.min_value,
        HP_LSTMCNN_DROPOUT_LSTM_RATE.domain.max_value,
    )
    dropout_attn_rate = trial.suggest_float(
        "dropout_attn_rate",
        HP_LSTMCNN_DROPOUT_ATTN_RATE.domain.min_value,
        HP_LSTMCNN_DROPOUT_ATTN_RATE.domain.max_value,
    )
    dropout_conv1_rate = trial.suggest_float(
        "dropout_conv1_rate",
        HP_LSTMCNN_DROPOUT_CONV1_RATE.domain.min_value,
        HP_LSTMCNN_DROPOUT_CONV1_RATE.domain.max_value,
    )
    dropout_conv2_rate = trial.suggest_float(
        "dropout_conv2_rate",
        HP_LSTMCNN_DROPOUT_CONV2_RATE.domain.min_value,
        HP_LSTMCNN_DROPOUT_CONV2_RATE.domain.max_value,
    )
    dropout_fc_rate = trial.suggest_float(
        "dropout_fc_rate",
        HP_LSTMCNN_DROPOUT_FC_RATE.domain.min_value,
        HP_LSTMCNN_DROPOUT_FC_RATE.domain.max_value,
    )
    cnn_out_channels = trial.suggest_int(
        "cnn_out_channels",
        HP_LSTMCNN_CNN_OUT_CHANNELS.domain.min_value,
        HP_LSTMCNN_CNN_OUT_CHANNELS.domain.max_value,
    )
    fc_hidden_size = trial.suggest_int(
        "fc_hidden_size",
        HP_LSTMCNN_FC_HIDDEN_SIZE.domain.min_value,
        HP_LSTMCNN_FC_HIDDEN_SIZE.domain.max_value,
    )

    learning_rate = trial.suggest_float("learning_rate", LR_MIN, LR_MAX)
    weight_decay = trial.suggest_float("weight_decay", WEIGHT_DECAY_MIN, WEIGHT_DECAY_MAX)
    # Define hyperparameters
    hparams = {
        HP_LSTMCNN_HIDDEN_SIZE: hidden_size,
        HP_LSTMCNN_NUM_LAYERS: num_layers,
        HP_LSTMCNN_NUM_HEADS: num_heads,
        HP_LSTMCNN_CONV1_KERNEL_SIZE: conv1_kernel_size,
        HP_LSTMCNN_CONV2_KERNEL_SIZE: conv2_kernel_size,
        HP_LSTMCNN_POOL1_KERNEL_SIZE: pool1_kernel_size,
        HP_LSTMCNN_POOL2_KERNEL_SIZE: pool2_kernel_size,
        HP_LSTMCNN_DROPOUT_LSTM_RATE: dropout_lstm_rate,
        HP_LSTMCNN_DROPOUT_ATTN_RATE: dropout_attn_rate,
        HP_LSTMCNN_DROPOUT_CONV1_RATE: dropout_conv1_rate,
        HP_LSTMCNN_DROPOUT_CONV2_RATE: dropout_conv2_rate,
        HP_LSTMCNN_DROPOUT_FC_RATE: dropout_fc_rate,
        HP_LSTMCNN_CNN_OUT_CHANNELS: cnn_out_channels,
        HP_LSTMCNN_FC_HIDDEN_SIZE: fc_hidden_size,
    }

    input_size = single_batch_size[2]
    seq_length = single_batch_size[1]
    
    model = create_lstmcnn(input_size, seq_length, len(classes), hparams, device)
    f1 = train_model(
        model,
        train_loader,
        val_loader,
        num_epochs=200,
        learning_rate=learning_rate,
        save_best_model=False,
        weight_decay=weight_decay,
        log_dir='logs_lstmcnn',
        device=device
    )

    return f1

def objective_lstmcnn(trial):
    return objective(trial, lstmcnn_iteration)

In [None]:
if MODE == 'SEARCH':
    # study_cnn = optuna.create_study(direction="maximize")
    # study_cnn.optimize(objective_cnn, n_trials=200, n_jobs=1)

    # print("Best hyperparameters for CNN:", study_cnn.best_params)
    
    study_lstmcnn = optuna.create_study(direction="maximize")
    study_lstmcnn.optimize(objective_lstmcnn, n_trials=200, n_jobs=1)
    
    print("Best hyperparameters for LSTMCNN:", study_lstmcnn.best_params)
    
    # study_lstm = optuna.create_study(direction="maximize")
    # study_lstm.optimize(objective_lstm, n_trials=50, n_jobs=1)

    # print("Best hyperparameters for LSTM:", study_lstm.best_params)
    
    # print(f"Hyperparameter search for all models done. Found values:\n CNN ({study_cnn.best_value}): {study_cnn.best_params}\n LSTMCNN ({study_lstmcnn.best_value}): {study_lstmcnn.best_params}\n LSTM ({study_lstm.best_value}): {study_lstm.best_params}")
    # print(f"Hyperparameter search for all models done. Found values:\n LSTMCNN ({study_lstmcnn.best_value}): {study_lstmcnn.best_params}\n LSTM ({study_lstm.best_value}): {study_lstm.best_params}")

    

## Run training for single model

In [None]:
def evaluate(model, device,  loader, description):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    all_labels = []
    all_predictions = []
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for inputs, labels in loader:
            # Move inputs and labels to device
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    loss = running_loss / len(loader.dataset)
    acc = correct_predictions / len(loader.dataset)
    f1 = f1_score(all_labels, all_predictions, average='macro')

    print(f"{description} Loss: {loss:.4f}, {description} Acc: {acc:.4f}, {description} F1: {f1:.4f}")

    # Create confusion matrix
    class_names = [name for name, value in sorted(class_dict.items(), key=lambda x: x[1])]
    conf_matrix = confusion_matrix(all_labels, all_predictions, labels=list(class_dict.values()))

    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title(f"{description} Confusion Matrix")
    plt.show()

    return loss, acc, f1

In [None]:
gc.collect()
torch.cuda.empty_cache()
def test_model(model_type='CNN'):
    BATCH_SIZE = 256
    best_model_path = f'best_model_{model_type}.pth'
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print("Using device:", device)

    private_test_loader = create_private_loader(model_type, BATCH_SIZE, private_test_x, private_test_y)
    single_batch_size, train_loader, val_loader, test_loader = create_loaders(model_type, BATCH_SIZE, train_x, train_y, val_x, val_y, test_x, test_y)

    model = create_sample_model(model_type, single_batch_size, len(classes), device)

    train_model(
        model,
        train_loader,
        val_loader,
        num_epochs=1000,
        learning_rate=0.00002,
        save_best_model=True,
        best_model_path= best_model_path,
        weight_decay=0.02,
        log_dir=f'{model_type}_logs',
        device=device
    )

    model.load_state_dict(torch.load(best_model_path))
    print("Loaded best model for testing.")

    print("Evaluating on test set:")
    test_loss, test_acc, test_f1 = evaluate(model, device, test_loader, "Test")

    # Evaluation on private test set
    print("Evaluating on private test set:")
    private_test_loss, private_test_acc, private_test_f1 = evaluate(model, device, private_test_loader, "Private Test")
    
if MODE == 'RUN':
    test_model('CNN')
    # test_model('LSTMCNN')

In [None]:
def train_final_model(model_type='CNN'):
    BATCH_SIZE = 256
    best_model_path = f'final_model_{model_type}.pth'
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print("Using device:", device)

    private_test_loader = create_private_loader(model_type, BATCH_SIZE, private_test_x, private_test_y)
    single_batch_size, final_train_loader, final_val_loader = create_final_loaders(model_type, BATCH_SIZE, final_train_x, final_train_y, val_x, val_y)
    
    model = create_sample_model(model_type, single_batch_size, len(classes), device)

    train_model(
        model,
        final_train_loader,
        final_val_loader,
        num_epochs=1000,
        learning_rate=0.00002,
        save_best_model=True,
        best_model_path= best_model_path,
        weight_decay=0.02,
        log_dir=f'{model_type}_final_logs',
        device=device
    )

    model.load_state_dict(torch.load(best_model_path))
    print("Loaded best model for testing.")

    print("Evaluating on private test set:")
    private_test_loss, private_test_acc, private_test_f1 = evaluate(model, device, private_test_loader, "Private Test")

In [None]:
train_final_model()

In [None]:
# !zip -r tensorboard_logs1.zip ./logs

In [None]:
gc.collect()
torch.cuda.empty_cache()

In [None]:
# !rm -rf ./logs*
# !rm tensorboard*