In [None]:
# 1. Install the specific library for the CNN backbone
!pip install timm

# 2. Connect to your Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 3. Verify you can see your files (Change path if needed)
import os
# Check if your folder exists
if os.path.exists('/content/drive/MyDrive/Seneca/Capstone/spectrograms'):
    print("Success! Connected to Capstone Data.")
else:
    print("Warning: Folder not found. Check your path in Drive.")

Mounted at /content/drive
Success! Connected to Capstone Data.


In [None]:
!ls /content/

drive  sample_data


In [None]:
import shutil

# Define paths
zip_path = '/content/drive/MyDrive/Seneca/Capstone/spectrograms/train_spectrograms.zip'
local_dir = '/content/train_spectrograms'

print("Copying data to local machine (this speeds up training)...")
# Unzip directly to local folder
shutil.unpack_archive(zip_path, local_dir)

print("Done! Data is ready for the model.")

Copying data to local machine (this speeds up training)...
Done! Data is ready for the model.


In [None]:
# Unzip the Validation Spectrograms
val_zip_path = '/content/drive/MyDrive/Seneca/Capstone/spectrograms/val_spectrograms.zip'
val_local_dir = '/content/val_spectrograms'

print("Unzipping validation data...")
shutil.unpack_archive(val_zip_path, val_local_dir)
print("Done! Validation data ready.")

Unzipping validation data...
Done! Validation data ready.


In [None]:
import pandas as pd
import torch
import numpy as np
import cv2
from torch.utils.data import Dataset

class HMSDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        spec_id = row['spectrogram_id']
        img_path = f"{self.img_dir}/{spec_id}.parquet"

        try:
            # Load data (Time is usually the index, Frequency is columns)
            # Shape: (Time_Steps, Frequencies) e.g., (4000, 400)
            spectrogram = pd.read_parquet(img_path).values
        except Exception as e:
            return torch.zeros((1, 128, 256)), torch.zeros(6)

        # 1. Handle NaNs and Log Scale
        spectrogram = np.nan_to_num(spectrogram)
        spectrogram = np.log1p(spectrogram)

        # 2. THE FIX: CENTER CROP (Time Dimension)
        # We want a fixed time window of 256 steps.
        # If the file is 4000 steps long, we take the middle 256.
        # If it's too short, we pad it.

        desired_time = 256
        current_time = spectrogram.shape[0] # The variable length (e.g. 337 or 4089)

        if current_time > desired_time:
            # Crop the center
            start = (current_time - desired_time) // 2
            spectrogram = spectrogram[start : start + desired_time, :]
        else:
            # Pad with zeros if too short
            pad_needed = desired_time - current_time
            # Pad (Bottom, Top) -> (Time_End, Time_Start)
            spectrogram = np.pad(spectrogram, ((0, pad_needed), (0, 0)), mode='constant')

        # 3. Resize Frequency (Height) only
        # We can safely squish frequency (e.g. 400 -> 128) without losing the "event"
        # cv2.resize takes (Width, Height) -> (Frequency, Time)
        # We want output (256, 128) -> Time=256, Freq=128
        spectrogram = cv2.resize(spectrogram, (128, 256))

        # 4. Standardize
        mean = spectrogram.mean()
        std = spectrogram.std() + 1e-6
        spectrogram = (spectrogram - mean) / std

        # 5. Final Shape: (Channels, Time, Freq) -> (1, 256, 128)
        # We need to Transpose so Time is the width?
        # Actually EfficientNet expects (C, H, W). Let's treat Time as Width.
        spectrogram = spectrogram.T # Now (128, 256) -> (Freq, Time)
        spectrogram = torch.tensor(spectrogram, dtype=torch.float32).unsqueeze(0)

        # Labels
        label_cols = ['seizure_vote', 'lpd_vote', 'gpd_vote', 'lrda_vote', 'grda_vote', 'other_vote']
        labels = row[label_cols].values.astype('float32')
        labels = labels / (labels.sum() + 1e-6)

        return spectrogram, torch.tensor(labels, dtype=torch.float32)



---



# CNN-Transformer Hybrid

### 320 Channels

In [None]:
import torch
import torch.nn as nn
import timm

class HybridModel(nn.Module):
    def __init__(self, num_classes=6, d_model=256, nhead=4, num_layers=2):
        super().__init__()

        # 1. The CNN Backbone (High Quality Mode)
        self.cnn = timm.create_model(
            'efficientnet_b0',
            pretrained=True,
            in_chans=1,
            features_only=True,
            # Index 4 is the deepest block in EfficientNet-B0
            out_indices=[4]
        )

        # High Quality Configuration:
        # The 5th block of EfficientNet-B0 outputs 320 channels.
        cnn_out_channels = 320

        # 2. Projection Layer
        self.projection = nn.Linear(cnn_out_channels, d_model)

        # 3. Positional Encoding
        self.pos_embedding = nn.Parameter(torch.randn(1, 64, d_model))

        # 4. Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model*4,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # 5. Classifier
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x):
        # x shape: (Batch, 1, 128, 256)

        # 1. CNN Feature Extract
        # Returns the deep 320-channel features
        features = self.cnn(x)[0]

        # 2. Prepare for Transformer
        # Pool Frequency dimension (Height) -> Leave Time (Width)
        features = features.mean(dim=2)

        # Permute: (Batch, Time, Channels)
        features = features.permute(0, 2, 1)

        # 3. Project (320 -> 256)
        x = self.projection(features)

        # 4. Positional Encoding
        seq_len = x.shape[1]
        if seq_len > self.pos_embedding.shape[1]:
             x = x[:, :self.pos_embedding.shape[1], :]
             seq_len = x.shape[1]

        x = x + self.pos_embedding[:, :seq_len, :]

        # 5. Transformer
        x = self.transformer(x)

        # 6. Classify
        x = x.mean(dim=1)
        output = self.classifier(x)

        return output

In [None]:
import os
from torch.utils.data import DataLoader

# --- 1. Define paths confirming they point to local /content/ (on the Colab machine) ---
# This is where the folders are unzipped
TRAIN_IMG_DIR = '/content/train_spectrograms'
VAL_IMG_DIR   = '/content/val_spectrograms'

# You have the CSVs here (uploaded to Colab or Drive):
TRAIN_CSV = '/content/drive/MyDrive/Seneca/Capstone/spectrograms/train.csv'
VAL_CSV   = '/content/drive/MyDrive/Seneca/Capstone/spectrograms/val.csv'

# --- 2. Create Datasets ---
# HERE is where we pass the specific CSV file!
train_dataset = HMSDataset(csv_file=TRAIN_CSV, img_dir=TRAIN_IMG_DIR)
val_dataset   = HMSDataset(csv_file=VAL_CSV,   img_dir=VAL_IMG_DIR)

# --- 3. THE OPTIMIZED LOADERS (The Batch Makers) ---
# num_workers=2: Uses 2 background processes to load data (Speed boost!)
# pin_memory=True: Speeds up the transfer from CPU RAM to GPU RAM
train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2,      # <--- CRITICAL FIX
    pin_memory=True,    # <--- CRITICAL FIX
    prefetch_factor=2   # <--- Loads 2 batches in advance
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=2,      # <--- CRITICAL FIX
    pin_memory=True,
    prefetch_factor=2
)

print(f"Loaded {len(train_dataset)} training samples.")
print(f"Loaded {len(val_dataset)} validation samples.")

Loaded 85440 training samples.
Loaded 10680 validation samples.




---



# Training begins

In [None]:
import torch.optim as optim

# 1. Define the Device (GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on: {device}")

# 2. Instantiate the Model (The Brain)
model = HybridModel(num_classes=6) # 6 classes for HMS dataset
#model = HybridModel() # 6 classes for HMS dataset
model = model.to(device) # Move brain to GPU

# 3. Define the Optimizer (The Teacher's Feedback)
# AdamW is the standard "smart" optimizer for Transformers
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-2)

# 4. Define the Loss Function (The Grading Scale)
# Since we are using Soft Labels (probabilities like 0.2, 0.5), we use CrossEntropyLoss
loss_fn = nn.CrossEntropyLoss()

Training on: cuda




In [None]:
from tqdm import tqdm # This creates a nice progress bar

def train_one_epoch(dataloader, model, loss_fn, optimizer, device):
    model.train() # Set model to "Learning Mode" (enables Dropout, etc.)
    running_loss = 0.0

    # The Loop: Iterate over every batch of data
    # tqdm wraps the loader to show a progress bar
    progress_bar = tqdm(dataloader, desc="Training", leave=False)

    for inputs, targets in progress_bar:
        inputs, targets = inputs.to(device), targets.to(device)

        # A. Reset Gradients (Clear previous mistakes)
        optimizer.zero_grad()

        # B. Forward Pass (The Model guesses)
        outputs = model(inputs)

        # C. Calculate Loss (Compare guess vs truth)
        loss = loss_fn(outputs, targets)

        # D. Backward Pass (Calculate adjustments)
        loss.backward()

        # E. Optimize (Update weights)
        optimizer.step()

        # Keep track of the score
        running_loss += loss.item()

        # Update progress bar
        progress_bar.set_postfix({'loss': loss.item()})

    return running_loss / len(dataloader)

def validate_one_epoch(dataloader, model, loss_fn, device):
    model.eval() # Set model to "Test Mode" (freezes layers)
    running_loss = 0.0

    with torch.no_grad(): # Don't calculate gradients (saves memory)
        for inputs, targets in tqdm(dataloader, desc="Validating", leave=False):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)
            running_loss += loss.item()

    return running_loss / len(dataloader)

In [None]:
import time

# --- Configuration ---
NUM_EPOCHS = 10             # How many times to study the whole dataset
best_val_loss = float('inf') # Track the best score to save the best model
history = {'train_loss': [], 'val_loss': []} # To plot a graph later

print(f"Starting training for {NUM_EPOCHS} epochs on {device}...")
start_time = time.time()

for epoch in range(NUM_EPOCHS):
    # --- 1. Train (The Model Studies) ---
    # Note: We use the function definitions I gave you earlier
    train_loss = train_one_epoch(train_loader, model, loss_fn, optimizer, device)

    # --- 2. Validate (The Model Takes a Quiz) ---
    val_loss = validate_one_epoch(val_loader, model, loss_fn, device)

    # --- 3. Record History ---
    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)

    # --- 4. Print Progress ---
    # We print a clean line so you can see if it's improving
    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    # --- 5. Save the "Best" Version ---
    # If the model scored better on the quiz than ever before, save it!
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_transformer_model.pth")
        print(f"  --> New best model saved! (Loss: {val_loss:.4f})")

total_time = time.time() - start_time
print(f"\nTraining Complete in {total_time/60:.2f} minutes!")

Starting training for 10 epochs on cuda...




Epoch 1/10 | Train Loss: 1.1352 | Val Loss: 1.3700
  --> New best model saved! (Loss: 1.3700)




Epoch 2/10 | Train Loss: 1.2988 | Val Loss: 1.3797




Epoch 3/10 | Train Loss: 1.3551 | Val Loss: 1.4273


Training:  24%|██▍       | 647/2670 [10:06<27:34,  1.22it/s, loss=1.41]

Ignore everything below this

---



In [None]:
# Assuming you have already created 'train_loader' and 'val_loader' using the HMSDataset class

NUM_EPOCHS = 10
best_val_loss = float('inf')

print("Starting Training...")

for epoch in range(NUM_EPOCHS):
    # 1. Train
    train_loss = train_one_epoch(train_loader, model, loss_fn, optimizer, device)

    # 2. Validate
    val_loss = validate_one_epoch(val_loader, model, loss_fn, device)

    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    # 3. Save the Best Model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_transformer_model.pth")
        print("  >>> New Best Model Saved!")

print("Training Complete!")

In [None]:
# --- IMPORT YOUR CUSTOM CODE ---
# These imports assume you created the files I gave you earlier
from load_data import HMSDataset
# Note: You need to create the model.py file with the 'HybridModel' class
# I gave you in the previous step for this import to work.
from app.model import HybridModel

# --- CONFIGURATION (The Control Panel) ---
DEBUG_MODE = True  # <--- SET THIS TO TRUE FOR MAC/WINDOWS TESTING
                   # <--- SET THIS TO FALSE FOR ACTUAL GPU TRAINING

BATCH_SIZE = 8     # Small batch size is safer for laptops
LEARNING_RATE = 1e-4
NUM_EPOCHS = 1 if DEBUG_MODE else 10
DATA_PATH = "./data"
CSV_FILE = os.path.join(DATA_PATH, "train.csv")
SPECTROGRAM_PATH = os.path.join(DATA_PATH, "train_spectrograms")
SAVE_PATH = "./app/models/hms_model.pt"

def train():
    # 1. Hardware Check
    # This automatically picks GPU if available, otherwise CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"--- Running on: {device} ---")
    if DEBUG_MODE:
        print("!! DEBUG MODE ACTIVE: Running on tiny dataset subset !!")

    # 2. Load Data
    print("Initializing Dataset...")
    full_dataset = HMSDataset(CSV_FILE, SPECTROGRAM_PATH)

    if DEBUG_MODE:
        # If debugging, only grab the first 16 items (2 batches)
        # This makes the "epoch" finish instantly
        full_dataset.data = full_dataset.data.iloc[:16]

    dataloader = DataLoader(full_dataset, batch_size=BATCH_SIZE, shuffle=True)
    print(f"Data Loaded: {len(full_dataset)} samples")

    # 3. Initialize Model
    print("Initializing Model...")
    model = HybridModel().to(device)

    # 4. Define Loss & Optimizer
    # KLDivLoss is standard for when your targets are probabilities (vote percentages)
    criterion = nn.KLDivLoss(reduction='batchmean')
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # 5. The Training Loop
    print("\nStarting Training...")
    model.train() # Set model to training mode (enables Dropout, etc.)

    for epoch in range(NUM_EPOCHS):
        start_time = time.time()
        running_loss = 0.0

        for i, (inputs, targets) in enumerate(dataloader):
            # Move data to the same device as the model (CPU or GPU)
            inputs = inputs.to(device)
            targets = targets.to(device)

            # Zero the gradients (reset from last step)
            optimizer.zero_grad()

            # Forward pass (Guess)
            outputs = model(inputs)

            # Use LogSoftmax for KLDivLoss stability
            outputs = torch.log_softmax(outputs, dim=1)

            # Calculate Loss (Error)
            loss = criterion(outputs, targets)

            # Backward pass (Learn)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # Print progress every 10 batches (or every batch in debug)
            if DEBUG_MODE or (i + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{NUM_EPOCHS}], Step [{i+1}/{len(dataloader)}], Loss: {loss.item():.4f}")

        # End of Epoch Stats
        epoch_loss = running_loss / len(dataloader)
        print(f"Epoch [{epoch+1}] Complete. Avg Loss: {epoch_loss:.4f}. Time: {time.time() - start_time:.2f}s")

    # 6. Save the Model
    print("\nSaving Model...")
    # Create the folder if it doesn't exist
    os.makedirs(os.path.dirname(SAVE_PATH), exist_ok=True)
    torch.save(model.state_dict(), SAVE_PATH)
    print(f"Model saved to {SAVE_PATH}")
    print("Done!")

if __name__ == "__main__":
    train()