In [None]:
# Installing the requirements
print('Installing Requirements... ',end='')
!pip install lightning
!pip install wandb
print('Done')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Importing Libraries
print('Importing Libraries... ',end='')
import os
import math
from pathlib import Path
import pandas as pd
import torchaudio
import zipfile
from torchaudio.transforms import Resample
import IPython.display as ipd
from matplotlib import pyplot as plt
from tqdm import tqdm
import pytorch_lightning as pl
from torch.utils.data import Dataset, DataLoader
import torch
from torch.optim import lr_scheduler
import torch.optim as optim
import torch.nn.init as init
import seaborn as sns
import wandb
import numpy as np
from sklearn.metrics import f1_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
import seaborn as sns
import torch.nn.functional as F
from torchmetrics import Accuracy, ConfusionMatrix, AUROC
from torchmetrics.classification import MulticlassF1Score, MulticlassAUROC
print('Done')

In [None]:
# Extract data
with zipfile.ZipFile("/content/drive/MyDrive/Archive.zip", 'r') as zip_ref:
    zip_ref.extractall("/content/")

In [None]:
# Loading dataset
path = Path('/content')
df = pd.read_csv('/content/meta/esc50.csv')

In [None]:
# Getting list of raw audio files
wavs = list(path.glob('audio/*'))  # List all audio files in the 'audio' directory using pathlib.Path.glob

# Visualizing data
waveform, sample_rate = torchaudio.load(wavs[0])  # Load the waveform and sample rate of the first audio file using torchaudio

print("Shape of waveform: {}".format(waveform.size()))  # Print the shape of the waveform tensor
print("Sample rate of waveform: {}".format(sample_rate))  # Print the sample rate of the audio file

# Plot the waveform using matplotlib
plt.figure()
plt.plot(waveform.t().numpy())  # Transpose and convert the waveform tensor to a NumPy array for plotting

# Display the audio using IPython.display.Audio
ipd.Audio(waveform, rate=sample_rate)  # Create an interactive audio player for the loaded waveform


In [None]:
class CustomDataset(Dataset):
    def __init__(self, dataset, **kwargs):
        # Initialize CustomDataset object with relevant parameters
        # dataset: "train", "val", or "test"
        # kwargs: Additional parameters like data directory, dataframe, folds, etc.

        # Extract parameters from kwargs
        self.data_directory = kwargs["data_directory"]
        self.data_frame = kwargs["data_frame"]
        self.validation_fold = kwargs["validation_fold"]
        self.testing_fold = kwargs["testing_fold"]
        self.esc_10_flag = kwargs["esc_10_flag"]
        self.file_column = kwargs["file_column"]
        self.label_column = kwargs["label_column"]
        self.sampling_rate = kwargs["sampling_rate"]
        self.new_sampling_rate = kwargs["new_sampling_rate"]
        self.sample_length_seconds = kwargs["sample_length_seconds"]

        # print(self.data_frame.shape)

        # Filter dataframe based on esc_10_flag and data_type
        if self.esc_10_flag:
            self.data_frame = self.data_frame.loc[self.data_frame['esc10'] == True]
        # print(self.data_frame.shape)

        if dataset == "train":
            self.data_frame = self.data_frame.loc[
                (self.data_frame['fold'] != self.validation_fold) & (self.data_frame['fold'] != self.testing_fold)]
        elif dataset == "val":
            self.data_frame = self.data_frame.loc[self.data_frame['fold'] == self.validation_fold]
        elif dataset == "test":
            self.data_frame = self.data_frame.loc[self.data_frame['fold'] == self.testing_fold]

        # Get unique categories from the filtered dataframe
        self.categories = sorted(self.data_frame[self.label_column].unique())
        label_counts = self.data_frame[self.label_column].value_counts()
        # print(label_counts)
        # print(self.categories)

        # Initialize lists to hold file names, labels, and folder numbers
        self.file_names = []
        self.labels = []

        # Initialize dictionaries for category-to-index and index-to-category mapping
        self.category_to_index = {}
        self.index_to_category = {}

        for i, category in enumerate(self.categories):
            self.category_to_index[category] = i
            self.index_to_category[i] = category

        # if(True):
        #   print(self.category_to_index)
        #   print(self.index_to_category)

        # Populate file names and labels lists by iterating through the dataframe
        for ind in tqdm(range(len(self.data_frame))):
            row = self.data_frame.iloc[ind]
            # print(row)
            file_path = self.data_directory / "audio" / row[self.file_column]
            # print(file_path)
            self.file_names.append(file_path)
            self.labels.append(self.category_to_index[row[self.label_column]])

        # print(self.file_names)
        # print(self.labels)

        # if(dataset == "train"):
        #   for i in range(len(self.file_names)):
        #     print(self.file_names[i], self.labels[i])

        self.resampler = torchaudio.transforms.Resample(self.sampling_rate, self.new_sampling_rate)

        # Window size for rolling window sample splits (unfold method)
        if self.sample_length_seconds == 2:
            self.window_size = self.new_sampling_rate * 2
            self.step_size = int(self.new_sampling_rate * 0.75)
        else:
            self.window_size = self.new_sampling_rate
            self.step_size = int(self.new_sampling_rate * 0.5)

    def __getitem__(self, index):
        # Split audio files with overlap, pass as stacked tensors tensor with a single label
        path = self.file_names[index]
        audio_file = torchaudio.load(path, format=None, normalize=True)
        audio_tensor = self.resampler(audio_file[0])
        L = audio_tensor.size(1)
        # print(L)
        splits = audio_tensor.unfold(1, self.window_size, self.step_size)
        # print(splits.shape)
        samples = splits.permute(1, 0, 2)
        return samples, self.labels[index], path

    def __len__(self):
        return len(self.file_names)


In [None]:
class CustomDataModule(pl.LightningDataModule):
    def __init__(self, **kwargs):
        # Initialize the CustomDataModule with batch size, number of workers, and other parameters
        super().__init__()
        self.batch_size = kwargs["batch_size"]
        self.num_workers = kwargs["num_workers"]
        self.data_module_kwargs = kwargs

    def setup(self, stage=None):
        # Define datasets for training, validation, and testing during Lightning setup

        # If in 'fit' or None stage, create training and validation datasets
        if stage == 'fit' or stage is None:
            self.training_dataset = CustomDataset(dataset="train", **self.data_module_kwargs)
            self.validation_dataset = CustomDataset(dataset="val", **self.data_module_kwargs)

        # If in 'test' or None stage, create testing dataset
        if stage == 'test' or stage is None:
            self.testing_dataset = CustomDataset(dataset="test", **self.data_module_kwargs)

    def train_dataloader(self):
        # Return DataLoader for training dataset
        return DataLoader(self.training_dataset,
                          batch_size=self.batch_size,
                          shuffle=True,
                          collate_fn=self.collate_function,
                          num_workers=self.num_workers)

    def val_dataloader(self):
        # Return DataLoader for validation dataset
        return DataLoader(self.validation_dataset,
                          batch_size=self.batch_size,
                          shuffle=False,
                          collate_fn=self.collate_function,
                          num_workers=self.num_workers)

    def test_dataloader(self):
        # Return DataLoader for testing dataset
        return DataLoader(self.testing_dataset,
                          batch_size=self.batch_size,
                          shuffle=False,
                          collate_fn=self.collate_function,
                          num_workers=self.num_workers)

    def collate_function(self, data):
        """
        Collate function to process a batch of examples and labels.

        Args:
            data: a tuple of 2 tuples with (example, label) where
                example are the split 1 second sub-frame audio tensors per file
                label = the label

        Returns:
            A list containing examples (concatenated tensors) and labels (flattened tensor).
        """
        # examples, labels = zip(*data)
        # examples = torch.cat(examples)
        # labels = torch.flatten(torch.tensor(labels))
        # # examples = examples[:8]

        # return [examples, labels]

        examples, labels, path = zip(*data)
        #print(path, labels)
        # print(labels)
        batch_size = len(examples)  # Get the actual batch size

        # Duplicate labels to match the number of segments per batch
        duplicated_labels = []
        for l in labels:
            duplicated_labels.extend([l] * len(examples[0]))  # Assuming all examples have the same length

        # Concatenate examples along the batch dimension
        examples_batch = torch.cat(examples)

        # Flatten duplicated labels
        labels_batch = torch.tensor(duplicated_labels)

        return examples_batch, labels_batch



In [None]:
def kfold(valid_samp):
  test_samp = 1 #""" Do not change this!! """
  valid_samp = valid_samp # Use any value ranging from 2 to 5 for k-fold validation (valid_fold)
  batch_size = 32 # Free to change
  num_workers = 4 # Free to change
  custom_data_module = CustomDataModule(batch_size=batch_size,
                                        num_workers=num_workers,
                                        data_directory=path,
                                        data_frame=df,
                                        validation_fold=valid_samp,
                                        testing_fold=test_samp,  # set to 0 for no test set
                                        esc_10_flag=True,
                                        file_column='filename',
                                        label_column='category',
                                        sampling_rate=44100,
                                        new_sampling_rate=16000,  # new sample rate for input
                                        sample_length_seconds=1  # new length of input in seconds
                                        )

  custom_data_module.setup()
  return custom_data_module

In [None]:
torch.cuda.device_count()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# for batch_data, batch_labels in custom_data_module.train_dataloader():
#     print(f"Input shape: {batch_data.shape}, Labels shape: {batch_labels.shape}")

In [None]:
import torch
import torch.nn as nn

class OneD_CNN(nn.Module):
    def __init__(self):
        super(OneD_CNN, self).__init__()

        self.hiddenLayers = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.BatchNorm1d(8),
            # nn.Dropout(0.5),

            nn.Conv1d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.BatchNorm1d(16),
            # nn.Dropout(0.5),

            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.BatchNorm1d(32),
            # nn.Dropout(0.5),

            nn.Conv1d(in_channels=32, out_channels=16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.BatchNorm1d(16),
            # nn.Dropout(0.5),

            nn.Conv1d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.BatchNorm1d(8),
            # nn.Dropout(0.5),
        )

        self.full_layer = nn.Sequential(
            # nn.Flatten(),
            nn.Linear(120, 64),
            nn.ReLU(),
            # nn.Dropout(0.3),

            nn.Linear(64, 32),
            nn.ReLU(),
            # nn.Dropout(0.3),
            nn.Linear(32, 128),
            nn.ReLU(),

            nn.Linear(128, 10)
        )
        self.apply(self.init_weights)

    def init_weights(self, m):
        if isinstance(m, (nn.Conv1d, nn.Linear)):
            init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='relu')

    def forward(self, x):
      x = self.hiddenLayers(x)
      x = x.view(x.size(0), -1)
      x = self.full_layer(x)
      return x

# Create an instance of the model
# model = SimpleAudioCNN()

# # Print the model architecture
# print(model)


In [None]:
neural_network = OneD_CNN().to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(neural_network.parameters(), lr=0.001)
# learning_rate = 0.001
# momentum = 0.9  # Optional: You can adjust the momentum term
# optimizer = optim.SGD(neural_network.parameters(), lr=learning_rate, momentum=momentum)
# for p in optimizer.param_groups:
#   p['clip_grad_norm'] = 0.2
# scheduler = lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [None]:
all_labels = []
all_predicted = []
all_predicted_prob = []
def evaluation(data_loader, isTest=False):
  neural_network.eval()
  total, correct = 0, 0

  with torch.no_grad():
    for data in data_loader:
      inputs, labels = data
      inputs, labels = inputs.to(device), labels.to(device)
      outputs = neural_network(inputs)
      _, prediction = torch.max(outputs.data, 1)
      total = total + labels.size(0)
      correct = correct + (prediction == labels).sum().item()
      if isTest:
        all_labels.extend(labels.cpu().numpy())
        all_predicted.extend(prediction.cpu().numpy())
        all_predicted_prob.extend(torch.nn.functional.softmax(outputs, dim=1)[:, 1].cpu().numpy())
  return correct * 100 / total

In [None]:
wandb.init(project='DL_assignment2', name='Architecture 1')

In [None]:
loss_epoch_arr = []
training_accuracy_per_epoch = []
validation_accuracy_per_epoch = []
testing_accuracy_per_epoch = []
epoch_counter = 1
for i in range(2, 6):
  custom_data_module = kfold(i)
  for epoch in range(5):
    neural_network.train()  # Set the model to training mode
    running_loss = 0.0
    for inputs, labels in custom_data_module.train_dataloader():
      inputs, labels = inputs.to(device), labels.to(device)
      outputs = neural_network(inputs)
      loss = loss_function(outputs, labels)

      # lambda_reg = 0.001
      # l2_regularization = 0.0
      # for param in neural_network.parameters():
      #     l2_regularization += torch.norm(param, p=2)
      # loss += lambda_reg * l2_regularization

      optimizer.zero_grad()
      loss.backward()
      torch.nn.utils.clip_grad_norm_(neural_network.parameters(), max_norm=1)
      optimizer.step()
      running_loss += loss.item()
    loss_epoch_arr.append(running_loss)

    accuracy_training = evaluation(custom_data_module.train_dataloader())
    accuracy_validation = evaluation(custom_data_module.val_dataloader())
    accuracy_testing = evaluation(custom_data_module.test_dataloader())

    training_accuracy_per_epoch.append(accuracy_training / 100)
    validation_accuracy_per_epoch.append(accuracy_validation / 100)
    testing_accuracy_per_epoch.append(accuracy_testing / 100)
    # wandb.log({"train_loss": running_loss, "train_accuracy": accuracy_training}, step=epoch_counter)
    # wandb.log({"validation_accuracy": accuracy_validation}, step=epoch_counter)
    # wandb.log({"test_accuracy": accuracy_testing}, step=epoch_counter)
    print(f"Epoch: {epoch_counter}/{100}, Loss: {running_loss}, Test_Accuracy: {accuracy_testing}, Validation_Accuracy: {accuracy_validation}, Trainig_Accuracy: {accuracy_training}")
    # print()
    epoch_counter += 1
wandb.finish()

In [None]:
plt.plot(loss_epoch_arr, label="Loss", color="purple", linestyle='-', marker='o', markersize=8)
plt.title("Loss per Epoch", fontsize=16)
plt.xlabel("Epochs", fontsize=12)
plt.ylabel("Loss", fontsize=12)
plt.grid(True, linestyle='--', alpha=1)
plt.legend()
plt.show()

In [None]:
plt.plot(training_accuracy_per_epoch, label="Accuracy", color="orange", linestyle='-', marker='o', markersize=8)
plt.title("Training Accuracy per Epoch", fontsize=16)
plt.xlabel("Epochs", fontsize=12)
plt.ylabel("Accuracy", fontsize=12)
plt.grid(True, linestyle='--', alpha=1)
plt.legend()
plt.show()

In [None]:
plt.plot(validation_accuracy_per_epoch, label="Accuracy", color="orange", linestyle='-', marker='o', markersize=8)
plt.title("Training Accuracy per Epoch", fontsize=16)
plt.xlabel("Epochs", fontsize=12)
plt.ylabel("Accuracy", fontsize=12)
plt.grid(True, linestyle='--', alpha=1)
plt.legend()
plt.show()

In [None]:
plt.plot(testing_accuracy_per_epoch, label="Accuracy", color="orange", linestyle='-', marker='o', markersize=8)
plt.title("Testing Accuracy per Epoch", fontsize=16)
plt.xlabel("Epochs", fontsize=12)
plt.ylabel("Accuracy", fontsize=12)
plt.grid(True, linestyle='--', alpha=1)
plt.legend()
plt.show()

In [None]:
evaluation(custom_data_module.test_dataloader(), isTest=True)
conf_mat = confusion_matrix(all_labels, all_predicted)
plt.figure(figsize=(8, 6))
# plt.imshow(conf_mat, interpolation='nearest', cmap=plt.cm.Blues)
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='summer', xticklabels=range(10), yticklabels=range(10))
plt.title('Confusion Matrix')
# plt.colorbar()
# classes = ['Class 0', 'Class 1']  # Modify based on your class labels
# tick_marks = np.arange(len(classes))
# plt.xticks(tick_marks, classes)
# plt.yticks(tick_marks, classes)

plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [None]:
y_true = np.array(all_labels)
y_pred = np.array(all_predicted)
y_pred_prob = np.array(all_predicted_prob)
y_true_one_hot = label_binarize(y_true, classes=np.unique(y_true))
f1 = f1_score(y_true, y_pred, average='weighted')
print(f'Weighted F1 Score: {f1}')

In [None]:
all_labels1 = np.array(all_labels)
all_predicted_prob1 = np.array(all_predicted_prob)

# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(len(np.unique(all_labels))):
    # Use the true labels for the binary comparison
    fpr[i], tpr[i], _ = roc_curve((all_labels1 == i).astype(int), all_predicted_prob1)
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot ROC curve for each class
plt.figure(figsize=(8, 6))
for i in range(len(np.unique(all_labels1))):
    plt.plot(fpr[i], tpr[i], label=f'Class {i} (AUC = {roc_auc[i]:.2f})')

plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('AUC-ROC Curve for Test Set (Multi-Class)')
plt.legend(loc='lower right')
plt.show()

In [None]:
class PositionalEncoding(nn.Module):
  def __init__(self, d_model, seq_length, dropout = 0.1):
    super().__init__()
    self.d_model = d_model
    self.seq_length = seq_length
    self.dropout = nn.Dropout(dropout)

    self.positional_encoding_vector = torch.zeros(seq_length, d_model) #(seq_length * d_model)
    position = torch.arange(0, seq_length).unsqueeze(1) #(seq_length * 1) adds a new dimension
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
    #torch.arange(start, end, step) creates 1d array from start to end - 1 with step
    #example - torch.arange(0, 10, 2) -> [0, 2,4,6,8] -> basically (1 * end-start//step) matrix

    self.positional_encoding_vector[:, 0::2] = torch.sin(position * div_term)
    #position_encoding_vector is a seq_length * d_model vector it means it has se_length rows and d_model columns so [:, 0::2] will filter out
    #each even index (start from 0 and goes to 2, 4,6....) column for each row and assigns torch.sin(position * div_term) this term

    self.positional_encoding_vector[:, 1::2] = torch.cos(position * div_term)

    self.positional_encoding_vector = self.positional_encoding_vector.unsqueeze(0)

  def forward(self, x):
    # print("positional1", x.shape)
    x = x + (self.positional_encoding_vector[:, :x.shape[1], :]).requires_grad_(False)
    # print("positional2", x.shape)
    x = self.dropout(x)
    return x

In [None]:
class LayerNormalization(nn.Module):
  def __init__(self, epsilon = 10**-6):
    super().__init__()
    self.epsilon = epsilon # used to avoid division by zero
    self.alpha = nn.Parameter(torch.ones(1))
    self.bias = nn.Parameter(torch.zeros(1))

  def forward(self, x):
    # print("inside LayerNormalization - x shape:", x.shape)
    mean = x.mean(dim = -1, keepdim=True)
    # print("inside LayerNormalization - mean:", mean)
    std = x.std(dim = -1, keepdim=True)
    # print("inside LayerNormalization - mean:", mean)
    x = self.alpha * (x - mean) / (std + self.epsilon) + self.bias
    return x

In [None]:
class MLPBlock(nn.Module):
  def __init__(self, d_model, d_ff, dropout=0.1):
    super().__init__()
    self.linear1 = nn.Linear(d_model, d_ff)
    self.relu1 = nn.ReLU()
    self.dropout = nn.Dropout(dropout)
    self.linear2 = nn.Linear(d_ff, d_model)

  def forward(self, x):
    # print(10)
    x = self.linear1(x)
    # print(11)
    x = self.relu1(x)
    x = self.dropout(x)
    # print(12)
    x = self.linear2(x)

    return x

In [None]:
class ResidualConnection(nn.Module):
  def __init__(self):
    super().__init__()
    self.norm = LayerNormalization()

  def forward(self, x, sublayer):
    # print("inside ResidualConnection - x shape:", x.shape)
    normalized_x = self.norm(x)
    # print("inside ResidualConnection - normalized_x shape:", normalized_x.shape)
    output_of_sublayer = sublayer(normalized_x)
    # print("inside ResidualConnection - output_of_sublayer shape:", output_of_sublayer.shape)
    result = x + output_of_sublayer
    # print("inside ResidualConnection - result shape:", result.shape)
    return result

In [None]:
class MultiHeadSelfAttentionBlock(nn.Module):
  def __init__(self, d_model, h, dropout=0.1):
    super().__init__()
    self.d_model = d_model
    self.h = h
    assert d_model % h == 0, "d_model is not divisible by h"

    self.d_k = self.d_model // self.h
    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)

    self.w_o = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout)

  @staticmethod
  def attention(query, key, value):
    d_k = query.shape[-1]
    attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
    attention_scores = attention_scores.softmax(dim = -1)
    attention_matrix = (attention_scores @ value)
    return attention_matrix, attention_scores

  def forward(self, q, k, v):
    # print(5, q.shape)
    query = self.w_q(q)
    # print(51, k.shape)
    key = self.w_k(k)
    # print(52, v.shape)
    value = self.w_v(v)
    # print(6)
    query = query.view(query.shape[0], query.shape[1],self.h, self.d_k).transpose(1, 2)
    key = key.view(key.shape[0], key.shape[1],self.h, self.d_k).transpose(1, 2)
    value = value.view(value.shape[0], value.shape[1],self.h, self.d_k).transpose(1, 2)
    # print(7)
    x, self.attention_scores = MultiHeadSelfAttentionBlock.attention(query, key, value)
    # print(8)
    x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
    #The use of contiguous is important when you want to use the view method on a tensor that may have a non-contiguous memory layout
    # print(9)
    x = self.w_o(x)
    return x


In [None]:
class EncoderBlock(nn.Module):
  def __init__(self, d_model, h, dropout=0.1, d_ff=2048):
    super().__init__()
    self.positional_encoding = PositionalEncoding(d_model, seq_length=512)
    self.multi_head_self_attention_block = MultiHeadSelfAttentionBlock(d_model, h, dropout)
    self.feed_forward_block = MLPBlock(d_model, d_ff, dropout)
    # self.residual_connection = nn.ModuleList([
    #   ResidualConnection(self.multi_head_self_attention_block, d_model),
    #   ResidualConnection(self.feed_forward_block, d_model)
    # ])
    self.residual_connection = nn.ModuleList([ResidualConnection() for _ in range(2)])

  def forward(self, x):
    # x = self.resresidual_connection[0](x, lambda x: self.multi_head_self_attention_block(x, x, x))
    # x = self.resresidual_connection[1](x, self.feed_forward_block)
    # print(1, x.shape)
    x = self.positional_encoding(x)
    # print(2, x.shape)
    # attention_output = self.multi_head_self_attention_block(x, x, x)
    x = self.residual_connection[0](x, lambda x: self.multi_head_self_attention_block(x, x, x))
    # print(3)
    x = self.residual_connection[1](x, self.feed_forward_block)
    # print(4)
    return x

In [None]:
# class EncoderBlock(nn.Module):
#   def __init__(self, features, multi_head_self_attention_block, feed_forward_block):
#     super().__init__()
#     self.multi_head_self_attention_block = multi_head_self_attention_block
#     self.feed_forward_block = feed_forward_block
#     self.residual_connection = nn.ModuleList([ResidualConnection(features) for _ in range(2)])

#   def forward(self, x):
#     # x = self.resresidual_connection[0](x, lambda x: self.multi_head_self_attention_block(x, x, x))
#     # x = self.resresidual_connection[1](x, self.feed_forward_block)
#     attention_output = self.multi_head_self_attention_block(x, x, x)
#     x = self.resresidual_connection[0](x, attention_output)
#     x = self.resresidual_connection[1](x, self.feed_forward_block)
#     return x

In [None]:
from lightning.pytorch.loggers import WandbLogger
wandb_logger = WandbLogger()

In [None]:
class CNNTransformerClassifier(pl.LightningModule):
    def __init__(self, num_classes, d_model, cnn_model, n_heads):
        super(CNNTransformerClassifier, self).__init__()

        # Base CNN model
        self.cnn_model = cnn_model
        self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)

        # Transformer encoder blocks with varying number of heads
        self.transformer_blocks = nn.ModuleList([
            EncoderBlock(d_model, n_heads) for _ in range(3)
        ])

        # MLP head for classification
        self.mlp_head = nn.Sequential(
            nn.Linear(d_model, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )

        # self.f1_score = MulticlassF1Score(num_classes=num_classes)

    def forward(self, x):
        # x = self.hiddenLayers(x)
        # x = x.view(x.size(0), -1)
        # x = self.full_layer(x)
        # Base CNN
        # print('m',x.shape)
        x_cnn = self.cnn_model.hiddenLayers(x)  # Extract output from the second last layer
        x_cnn = x_cnn.view(x_cnn.size(0), -1)  # Extract output from the second last layer
        x = self.cnn_model.full_layer[:-2](x_cnn)
        # print('aa', x.shape)
        # x = self.maxpool(x_cnn)
        # print('a', x.shape)
        # x_cnn = x_cnn.view(x_cnn.size(0), -1)
        # print('b',x.shape)
        # x = torch.mean(x, dim=-2)
        # Transformer encoder blocks with varying number of heads
        cls_token = torch.zeros(x.size(0), 1, x.size(1)).to(x.device)
        x_with_cls = torch.cat([cls_token, x.unsqueeze(1)], dim=1)
        for transformer_block in self.transformer_blocks:
            # print("asd", x_with_cls.shape)
            x_with_cls  = transformer_block(x_with_cls)
            # print("asds")
        #     print(f'c{i}', x_cnn.shape)
        # print('c',x.shape)
        x_with_cls = x_with_cls[:, 0, :]
        # MLP head for classification
        x = self.mlp_head(x_with_cls)
        # print('d',x.shape)
        # print()

        return x

    # def training_step(self, batch, batch_idx):
    #   inputs, labels = batch
    #   outputs = self(inputs)
    #   loss = F.cross_entropy(outputs, labels)
    #   acc = Accuracy(task='multiclass', num_classes=num_classes)(outputs, labels)
    #   self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
    #   self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
    #   return loss

    # def validation_step(self, batch, batch_idx):
    #   inputs, labels = batch
    #   outputs = self(inputs)
    #   loss = F.cross_entropy(outputs, labels)
    #   acc = Accuracy(task='multiclass', num_classes=num_classes)(outputs, labels)
    #   self.log('val_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
    #   self.log('val_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
    #   return loss

    # def test_step(self, batch, batch_idx):
    #   inputs, labels = batch
    #   outputs = self(inputs)
    #   loss = F.cross_entropy(outputs, labels)
    #   acc = Accuracy(task='multiclass', num_classes=num_classes)(outputs, labels)
    #   confusion_matrix = ConfusionMatrix(task='multiclass', num_classes=num_classes)(outputs, labels)
    #   f1 = MulticlassF1Score(num_classes=num_classes, average=None)(outputs, labels)
    #   # fig_, ax_ = f1.plot()
    #   auc_roc = MulticlassAUROC(num_classes=num_classes, average="macro", thresholds=None)(outputs, labels)
    #   # fig_, ax_ = auc_roc.plot()

    #   self.log('test_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
    #   self.log('test_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
    #   self.log('test_confusion_matrix', confusion_matrix, on_step=True, on_epoch=True, prog_bar=True)
    #   self.log('test_f1', f1, on_step=True, on_epoch=True, prog_bar=True)
    #   self.log('test_auc_roc', auc_roc, on_step=True, on_epoch=True, prog_bar=True)

    #   return outputs, labels

    # def configure_optimizers(self):
    #   return optim.Adam(self.parameters(), lr = 0.001)


In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
# num_classes = 10
# d_model = 128
# cnn_out_channels = 64
# # transformer_heads_list = [1,2,4]
# transformer_heads_list = [4]
# learning_rate = 0.001
# num_epochs = 1
# cnn_model = OneD_CNN()

# for num_heads in transformer_heads_list:
#   model = CNNTransformerClassifier(num_classes, d_model, cnn_model, n_heads=num_heads)

#   # for folds in range(2, 6):
#   for folds in range(2, 3):
#     custom_data_module = kfold(folds)
#     trainer = pl.Trainer(max_epochs=num_epochs, logger=wandb_logger)
#     trainer.fit(model, custom_data_module.train_dataloader(), custom_data_module.val_dataloader())
#     trainer.test(model, custom_data_module.test_dataloader())

In [None]:
num_classes = 10
d_model = 128
cnn_out_channels = 64
transformer_heads_list = [1,2,4]
# transformer_heads_list = [4]
learning_rate = 0.001
num_epochs = 100

In [None]:
all_labels = []
all_predicted = []
all_predicted_prob = []
def evaluation2(model, data_loader, isTest=False):
  model.eval()
  total, correct = 0, 0

  with torch.no_grad():
    for data in data_loader:
      inputs, labels = data
      inputs, labels = inputs.to(device), labels.to(device)
      outputs = model(inputs)
      _, prediction = torch.max(outputs.data, 1)
      total = total + labels.size(0)
      correct = correct + (prediction == labels).sum().item()
      if isTest:
        all_labels.extend(labels.cpu().numpy())
        all_predicted.extend(prediction.cpu().numpy())
        all_predicted_prob.extend(torch.nn.functional.softmax(outputs, dim=1)[:, 1].cpu().numpy())
  return correct * 100 / total

In [None]:
wandb.init(project='DL_assignment2', name='Architecture 2')

In [None]:
loss_epoch_arr = []
training_accuracy_per_epoch = []
validation_accuracy_per_epoch = []
testing_accuracy_per_epoch = []
epoch_counter = 1

In [None]:
# Instantiate the base CNN model
cnn_model = OneD_CNN()

# Training loop
for num_heads in transformer_heads_list:
  # Instantiate the CNNTransformerClassifier
  model = CNNTransformerClassifier(num_classes, d_model, cnn_model, n_heads=num_heads)
  # print(model)

  # Loss function and optimizer
  loss_function = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=learning_rate)

  # Device configuration (use GPU if available)
  model = model.to(device)

  for folds in range(2, 6):
  # for folds in range(2, 3):
    custom_data_module = kfold(folds)
    for epoch in range(num_epochs):
      model.train()  # Set the model to training mode
      running_loss = 0.0
      for inputs, labels in custom_data_module.train_dataloader():
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = loss_function(outputs, labels)

        # lambda_reg = 0.001
        # l2_regularization = 0.0
        # for param in neural_network.parameters():
        #     l2_regularization += torch.norm(param, p=2)
        # loss += lambda_reg * l2_regularization

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
        optimizer.step()
        running_loss += loss.item()
      loss_epoch_arr.append(running_loss)

      accuracy_training = evaluation2(model, custom_data_module.train_dataloader())
      accuracy_validation = evaluation2(model, custom_data_module.val_dataloader())
      accuracy_testing = evaluation2(model, custom_data_module.test_dataloader())

      training_accuracy_per_epoch.append(accuracy_training / 100)
      validation_accuracy_per_epoch.append(accuracy_validation / 100)
      testing_accuracy_per_epoch.append(accuracy_testing / 100)
      wandb.log({"train_loss": running_loss, "train_accuracy": accuracy_training}, step=epoch_counter)
      wandb.log({"validation_accuracy": accuracy_validation}, step=epoch_counter)
      wandb.log({"test_accuracy": accuracy_testing}, step=epoch_counter)
      print(f"Epoch: {epoch_counter}/{100}, Loss: {running_loss}, Test_Accuracy: {accuracy_testing}, Validation_Accuracy: {accuracy_validation}, Trainig_Accuracy: {accuracy_training}")
      # print()
      epoch_counter += 1
wandb.finish()

In [None]:
evaluation2(model, custom_data_module.test_dataloader(), isTest=True)
conf_mat = confusion_matrix(all_labels, all_predicted)
plt.figure(figsize=(8, 6))
# plt.imshow(conf_mat, interpolation='nearest', cmap=plt.cm.Blues)
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='summer', xticklabels=range(10), yticklabels=range(10))
plt.title('Confusion Matrix')
# plt.colorbar()
# classes = ['Class 0', 'Class 1']  # Modify based on your class labels
# tick_marks = np.arange(len(classes))
# plt.xticks(tick_marks, classes)
# plt.yticks(tick_marks, classes)

plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [None]:
!pip install optuna
import optuna

In [None]:
max_epochs = 20
custom_data_module = kfold(4)
def hyperparameters(trial):
  learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-1, log=True)
  # batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])
  weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-3, log=True)
  dropout_prob = trial.suggest_float('dropout_prob', 0.0, 0.5)
  # momentum = trial.suggest_float('momentum', 0.0, 0.9)
  step_size = trial.suggest_int('step_size', 1, 10)
  gamma = trial.suggest_float('gamma', 0.1, 0.9)
  beta1 = trial.suggest_float('beta1', 0.1, 0.9)


  loss_function = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=learning_rate, betas=(beta1, 0.999), weight_decay=weight_decay)
  scheduler = lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

  loss_epoch_arr = []
  accuracy_per_epoch = []
  for epoch in range(max_epochs):
    for i, data in enumerate(custom_data_module.train_dataloader()):
      inputs, labels = data
      inputs, labels = inputs.to(device), labels.to(device)
      optimizer.zero_grad()
      outputs = model(inputs)
      loss = loss_function(outputs, labels)
      # l2_regularization = 0.0
      # for param in neural_network_4Class.parameters():
      #   l2_regularization += torch.norm(param, 2)
      # loss += 1e-4 * l2_regularization

      lambda_reg = 1e-5
      l2_regularization = 0.0
      for param in model.parameters():
        l2_regularization += torch.sum(param**2)
      l2_regularization = torch.sqrt(l2_regularization)
      loss += lambda_reg * l2_regularization

      loss.backward()
      optimizer.step()
    loss_epoch_arr.append(loss.item())
    scheduler.step()
    accuracy, _, _, _, _ = evaluation2(model, custom_data_module.train_dataloader())
    accuracy_test, _, _, _, _ = evaluation2(model, custom_data_module.test_dataloader())
    accuracy_per_epoch.append(accuracy / 100)
    # print(f"Epoch: {epoch+1}/{max_epochs}, Loss: {loss}, Test_Accuracy: {evaluation2(test_loader)}, Trainig_Accuracy: {evaluation2(train_loader)}")
    print(f"Epoch: {epoch+1}/{max_epochs}, Loss: {loss}, Test_Accuracy: {accuracy_test}, Trainig_Accuracy: {accuracy}")
    accuracy, _, _, _, _ = evaluation2(model,custom_data_module.test_dataloader())
  return loss_epoch_arr[-1]

study = optuna.create_study(direction='minimize')
study.optimize(hyperparameters, n_trials=10)
best_params = study.best_params
best_learning_rate = best_params['learning_rate']
# best_batch_size = best_params['batch_size']
best_weight_decay = best_params['weight_decay']
best_dropout_prob = best_params['dropout_prob']
# best_momentum = best_params['momentum']
best_step_size = best_params['step_size']
best_gamma = best_params['gamma']
best_beta1 = best_params['beta1']

print("Best Hyperparameters:")
print("Learning Rate:", best_learning_rate)
# print("Batch Size:", best_batch_size)
print("Weight Decay:", best_weight_decay)
print("Dropout Probability:", best_dropout_prob)
# print("Momentum:", best_momentum)
print("Step Size for LR Scheduler:", best_step_size)
print("Gamma for LR Scheduler:", best_gamma)
print("Beta1 (Momentum in Adam):", best_beta1)