<a href="https://colab.research.google.com/github/elijkon/DeepLearning_MiniHackathon/blob/main/DL_miniHackathon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
'''
Elijah Konkle, Kritan, and Mayur
Deep learning minihackathon

Each of the 1440 files has a unique filename. The filename consists of a 7-part numerical identifier (e.g., 03-01-06-01-02-01-12.wav). These identifiers define the stimulus characteristics:

Filename identifiers

1) Modality (01 = full-AV, 02 = video-only, 03 = audio-only).

2) Vocal channel (01 = speech, 02 = song).

3) Emotion (01 = neutral, 02 = calm, 03 = happy, 04 = sad, 05 = angry, 06 = fearful, 07 = disgust, 08 = surprised).

4) Emotional intensity (01 = normal, 02 = strong). NOTE: There is no strong intensity for the 'neutral' emotion.

5) Statement (01 = "Kids are talking by the door", 02 = "Dogs are sitting by the door").

6) Repetition (01 = 1st repetition, 02 = 2nd repetition).

7) Actor (01 to 24. Odd numbered actors are male, even numbered actors are female).

Filename example: 03-01-06-01-02-01-12.wav

Audio-only (03)
Speech (01)
Fearful (06)
Normal intensity (01)
Statement "dogs" (02)
1st Repetition (01)
12th Actor (12)
Female, as the actor ID number is even.
'''

In [None]:
# 1. Install deps (if not already installed)
!pip install datasets torch torchvision

# 2. Import libraries
from datasets import load_dataset
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import wandb
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import time

In [None]:

from datasets import load_dataset
from google.colab import userdata
HF_Token = userdata.get('HF_TOKEN')
from huggingface_hub import login
if HF_Token:
  login(token=HF_Token)
  print("Successfully logged in to Hugging Face!")
else:
  print("HF_TOKEN secret not found. Please add it to Colab Secrets.")
dataset = load_dataset("elijkon/DL_Spectrograms")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")
if device==torch.device('cpu'): print("You should probably restart this with a GPU. It will be slow otherwise.")

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

In [None]:
dataset

In [None]:
train_dataset = dataset['train']
test_dataset = dataset['test']
validation_dataset = dataset['validation']

In [None]:
print(train_dataset)
print(train_dataset.column_names)  # shows column names
print(train_dataset[0])             # shows first sample

In [None]:
sample = train_dataset[30]
# Load image (column name might be 'image' or something else)
img = sample['image']   # already a PIL Image object if stored as such
label = sample['label']

# Display
plt.imshow(img)
plt.title(f"Label: {label}")
plt.axis('off')
plt.show()


In [None]:
num_channels = 1 if img.mode == "L" else 3
img_height, img_width = img.size[::-1]  # PIL gives (width, height)

dataset_info = {
    "num_channels": 1,
    "img_height": img_height,
    "img_width": img_width,
    "num_classes": len(set(dataset["train"]["label"])),  # unique labels
    "class_names": list(set(dataset["train"]["label"]))
}

print(dataset_info)

In [None]:

# 4. Define transforms for CNN
transform = transforms.Compose([
    transforms.Resize((128, 128)),      # Resize to fixed size
    transforms.Grayscale(num_output_channels=1),  # spectrograms usually grayscale
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])   # normalize [-1,1]
])

# 5. Wrap HF dataset into PyTorch Dataset
class SpectrogramDataset(Dataset):
    def __init__(self, hf_dataset, transform=None):
        self.dataset = hf_dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        sample = self.dataset[idx]
        img = sample["image"]
        label = sample["label"]

        # If HF image is stored as file path, open it
        if isinstance(img, str):
            img = Image.open(img)

        if self.transform:
            img = self.transform(img)

        return img, label

# 6. Train / Test splits
train_dataset1 = SpectrogramDataset(train_dataset, transform=transform)
test_dataset1 = SpectrogramDataset(test_dataset, transform=transform)
validation_dataset1 = SpectrogramDataset(validation_dataset, transform=transform)

# 7. Dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset1, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset1, batch_size=batch_size, shuffle=False)
validation_loader = DataLoader(validation_dataset1, batch_size=batch_size, shuffle = False)
# 8. View a few sample images
imgs, labels = next(iter(test_loader))

print(f"Batch shape: {imgs.shape}")   # [B, C, H, W]
print(f"Labels: {labels[:10]}")

# Un-normalize for display
def show_images(imgs, labels, n=6):
    imgs = imgs[:n].clone()
    labels = labels[:n]
    imgs = imgs * 0.5 + 0.5  # unnormalize from [-1,1] to [0,1]

    fig, axes = plt.subplots(1, n, figsize=(15, 3))
    for i, ax in enumerate(axes):
        ax.imshow(imgs[i][0], cmap="gray")
        ax.set_title(f"Label: {labels[i].item()}")
        ax.axis("off")
    plt.show()

show_images(imgs, labels)


In [None]:
def train_model(model, train_loader, test_loader, epochs=50, lr=0.008):
    # Initialize wandb
    model_name = type(model).__name__
    wandb.init(project=f"convnet_spectrogram", name=f"{model_name},relu", reinit=True)
    wandb.config.update({
        "epochs": epochs,
        "batch_size": batch_size,
        "learning_rate": lr,
        "model": model_name,
        "optimizer": "Adam"
    })

    # Loss function:
    # Note: targets are just class indices (0-9), not one-hot vectors
    # nn.CrossEntropyLoss handles one-hot encoding internally for efficiency
    criterion = nn.CrossEntropyLoss()   # used for categorical variables, expects raw "logits"

    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses = []
    test_losses = []
    train_accuracies = []
    test_accuracies = []

    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        # Use tqdm for progress bar
        pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}')

        for batch_idx, (data, target) in enumerate(pbar):
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct_train += pred.eq(target.view_as(pred)).sum().item()
            total_train += target.size(0)

            # Update progress bar
            pbar.set_postfix({
                'Loss': f'{loss.item():.4f}',
                'Acc': f'{100.*correct_train/total_train:.2f}%'
            })

        # Calculate epoch metrics
        epoch_loss = running_loss / len(train_loader)
        train_acc = 100. * correct_train / total_train

        # Evaluation phase
        model.eval()
        test_loss_sum = 0
        correct_test = 0
        total_test = 0

        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss_sum += criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct_test += pred.eq(target.view_as(pred)).sum().item()
                total_test += target.size(0)

        test_loss = test_loss_sum / len(test_loader)
        test_acc = 100.0 * correct_test / total_test

        # Store metrics
        train_losses.append(epoch_loss)
        test_losses.append(test_loss)
        train_accuracies.append(train_acc)
        test_accuracies.append(test_acc)

        # Log to wandb
        wandb.log({
            "epoch": epoch + 1,
            "train_loss": epoch_loss,
            "test_loss": test_loss,
            "train_accuracy": train_acc,
            "test_accuracy": test_acc
        })

        print(f'Epoch {epoch+1}: Train Loss: {epoch_loss:.4f}, Test Loss: {test_loss:.4f} Train Acc: {train_acc:.2f}%, Test Acc: {test_acc:.2f}%')

    wandb.finish()

    return {
        'train_losses': train_losses,
        'train_accuracies': train_accuracies,
        'test_accuracies': test_accuracies,
        'final_test_acc': test_acc
    }

In [None]:
class CNN(nn.Module):
    def __init__(self, dataset_info, base_channels=32, channel_mult=2, n_conv_layers=4):
        super(CNN, self).__init__()

        # Build conv layers dynamically
        self.conv_layers = nn.ModuleList()
        in_channels = dataset_info['num_channels']
        for i in range(n_conv_layers):
            out_channels = base_channels * (channel_mult ** i)
            self.conv_layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1))
            in_channels = out_channels

        self.activation = nn.ReLU()
        self.dropout1 = nn.Dropout(0.25)
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1) # put near end: yields one value per channel

        # Final channels after all conv layers
        final_channels = base_channels * (channel_mult ** (n_conv_layers - 1))
        self.fc = nn.Linear(final_channels, dataset_info['num_classes'])

    def forward(self, x):
        for conv_layer in self.conv_layers:
            x = self.activation(conv_layer(x))
            x = self.dropout1(x)
        x = self.global_avg_pool(x)  # one value per channel
        return self.fc(  x.flatten(start_dim=1) )  # flatten and run through linear layer



cnn_model = CNN(dataset_info).to(device)
print(f"CNN Model Parameters: {sum(p.numel() for p in cnn_model.parameters()):,}")

# quick test to make sure the forward() runs w/o errors:
rnd_batch = torch.randn([128,dataset_info['num_channels'], dataset_info['img_height'], dataset_info['img_width']]).to(device)
result = cnn_model(rnd_batch)
del rnd_batch

In [None]:
import math

def find_lr(model, train_loader, criterion, device):
    # Use a temporary optimizer
    optimizer = optim.Adam(model.parameters(), lr=1e-7, weight_decay=1e-2)
    lr_finder_lrs = []
    lr_finder_losses = []

    # Define the range of learning rates to test
    min_lr = 1e-7
    max_lr = 1
    num_iter = len(train_loader)

    # Calculate the rate of increase
    gamma = (max_lr / min_lr) ** (1 / num_iter)

    model.train()
    for data, target in tqdm(train_loader, desc="Running LR Finder"):
        data, target = data.to(device), target.to(device)

        # Get current learning rate
        lr = optimizer.param_groups[0]['lr']
        lr_finder_lrs.append(lr)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)

        # Stop if the loss explodes
        if math.isnan(loss.item()) or loss.item() > 4 * min(lr_finder_losses, default=1.0):
            break

        lr_finder_losses.append(loss.item())
        loss.backward()
        optimizer.step()

        # Update the learning rate for the next batch
        optimizer.param_groups[0]['lr'] = lr * gamma

    # Plot the results
    plt.figure(figsize=(10, 6))
    plt.plot(lr_finder_lrs, lr_finder_losses)
    plt.xscale("log") # Use a log scale for the x-axis
    plt.xlabel("Learning Rate")
    plt.ylabel("Loss")
    plt.title("Learning Rate Finder")
    plt.grid(True)
    plt.show()

# --- How to use it in your notebook ---

# 1. Re-initialize your model to have fresh weights
cnn_model_for_lr_find = CNN(dataset_info).to(device)
criterion = nn.CrossEntropyLoss()

# 2. Run the finder
find_lr(cnn_model_for_lr_find, train_loader, criterion, device)

In [None]:
print("\nTraining CNN Model...")
start_time = time.time()
cnn_results = train_model(cnn_model, train_loader, validation_loader, epochs=50)
cnn_training_time = time.time() - start_time
print(f"CNN Training completed in {cnn_training_time:.2f} seconds")

Epoch 25/50: 100%|██████████| 16/16 [00:10<00:00,  1.50it/s, Loss=2.0173, Acc=11.65%]


Epoch 25: Train Loss: 2.0608, Test Loss: 2.0563 Train Acc: 11.65%, Test Acc: 13.30%


Epoch 26/50: 100%|██████████| 16/16 [00:11<00:00,  1.45it/s, Loss=2.0839, Acc=12.05%]


Epoch 26: Train Loss: 2.0622, Test Loss: 2.0564 Train Acc: 12.05%, Test Acc: 13.30%


Epoch 27/50:  62%|██████▎   | 10/16 [00:06<00:04,  1.44it/s, Loss=2.0595, Acc=11.56%]

In [None]:
#
#THIS IS THE FINAL MODEL USING THE VALIDATION SET WITH NO DROPOUTS AND DOES NOT CALC GRADIENTS.
#
# --- Final Evaluation on Test Set ---

# Ensure the model is on the correct device and in evaluation mode
cnn_model.to(device)
cnn_model.eval()

# We need the same loss function as before
criterion = nn.CrossEntropyLoss()

# Initialize variables to track performance
test_loss = 0.0
correct_test = 0
total_test = 0

# Disable gradient calculations for efficiency
with torch.no_grad():
    # Loop through the test data
    for data, target in test_loader:
        # Move data to the device (GPU/CPU)
        data, target = data.to(device), target.to(device)

        # Get model predictions
        output = cnn_model(data)

        # Calculate the loss for this batch
        loss = criterion(output, target)
        test_loss += loss.item()

        # Get the class with the highest probability
        pred = output.argmax(dim=1, keepdim=True)

        # Count correct predictions
        correct_test += pred.eq(target.view_as(pred)).sum().item()
        total_test += target.size(0)

# Calculate final accuracy and loss
final_accuracy = 100. * correct_test / total_test
avg_loss = test_loss / len(test_loader)

print(f"\n--- Final Test Results ---")
print(f"Average Loss: {avg_loss:.4f}")
print(f"Accuracy: {correct_test}/{total_test} ({final_accuracy:.2f}%)")