# UrbanSound8K Classification — Colab Pro

**Hardware options (Runtime → Change runtime type):**
- `T4 GPU` — Free / Pro: baseline option, ~16 GB VRAM
- `A100 GPU` — Pro+: fastest, use if available
- `V100 GPU` — Pro: good middle ground
- `TPU v2-8` — Experimental only; PyTorch on TPU requires extra setup, **use GPU instead**

**Recommended for this project: T4 or A100 GPU**

This notebook persists all checkpoints to Google Drive.
Run it across multiple sessions — completed folds are skipped automatically.

In [None]:
# ── Cell 1: Check hardware ──────────────────────────────────────
import torch

if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    gpu_mem  = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f'GPU: {gpu_name}')
    print(f'VRAM: {gpu_mem:.1f} GB')
    DEVICE = torch.device('cuda')
else:
    print('No GPU! Go to Runtime → Change runtime type → GPU')
    DEVICE = torch.device('cpu')

print(f'Device: {DEVICE}')
print(f'PyTorch: {torch.__version__}')

In [None]:
# ── Cell 2: Mount Drive & set paths ────────────────────────────
from google.colab import drive
drive.mount('/content/drive')

import os

# !! EDIT THESE to match your Drive structure
DRIVE_ROOT       = '/content/drive/MyDrive/ECE176_project'
DATASET_ROOT     = os.path.join(DRIVE_ROOT, 'UrbanSound8K')
CHECKPOINT_DIR   = os.path.join(DRIVE_ROOT, 'checkpoints')
RESULTS_DIR      = os.path.join(DRIVE_ROOT, 'results')
CACHE_DIR        = os.path.join(DRIVE_ROOT, 'cache')

for d in [CHECKPOINT_DIR, RESULTS_DIR, CACHE_DIR]:
    os.makedirs(d, exist_ok=True)

print('Drive mounted.')
print(f'Dataset: {DATASET_ROOT}')
print(f'Dataset exists: {os.path.exists(DATASET_ROOT)}')

In [None]:
# ── Cell 3: Download dataset via Kaggle (first time only) ───────
# Skip if UrbanSound8K folder already exists in Drive
import os

if os.path.exists(os.path.join(DATASET_ROOT, 'metadata', 'UrbanSound8K.csv')):
    print('Dataset already present. Skipping download.')
else:
    print('Dataset not found. Downloading from Kaggle...')
    # Upload your kaggle.json first via Files panel on the left
    os.makedirs('/root/.kaggle', exist_ok=True)
    !cp /content/kaggle.json /root/.kaggle/
    !chmod 600 /root/.kaggle/kaggle.json

    !pip install -q kaggle
    !kaggle datasets download -d chrisfilo/urbansound8k -p /content/
    !unzip -q /content/urbansound8k.zip -d /content/UrbanSound8K_tmp

    import shutil
    shutil.move('/content/UrbanSound8K_tmp', DATASET_ROOT)
    print('Download complete.')

In [None]:
# ── Cell 4: Install dependencies ────────────────────────────────
!pip install -q librosa scikit-learn tqdm

In [None]:
# ── Cell 5: Core utilities (inline — no local files needed) ─────
import numpy as np
import pandas as pd
import librosa
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import json

# ── Constants ──
CLASSES = ['air_conditioner','car_horn','children_playing','dog_bark',
           'drilling','engine_idling','gun_shot','jackhammer','siren','street_music']
SAMPLE_RATE = 22050
CLIP_DURATION = 4.0
N_MFCC = 40
N_MELS = 128
HOP_LENGTH = 512
N_FFT = 2048
MEL_LENGTH = 128

# ── Audio loading ──
def load_audio(path, sr=SAMPLE_RATE, duration=CLIP_DURATION):
    try:
        y, _ = librosa.load(path, sr=sr, duration=duration, mono=True)
    except:
        return np.zeros(int(sr * duration), dtype=np.float32)
    target_len = int(sr * duration)
    if len(y) < target_len:
        y = np.pad(y, (0, target_len - len(y)))
    return y[:target_len].astype(np.float32)

# ── Feature extraction ──
def extract_mfcc(y, sr=SAMPLE_RATE):
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=N_MFCC, n_fft=N_FFT, hop_length=HOP_LENGTH)
    return np.concatenate([mfcc.mean(axis=1), mfcc.std(axis=1)])

def extract_mel(y, sr=SAMPLE_RATE, fixed_length=MEL_LENGTH):
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=N_MELS,
                                          n_fft=N_FFT, hop_length=HOP_LENGTH, fmax=8000)
    log_mel = librosa.power_to_db(mel, ref=np.max)
    if log_mel.shape[1] < fixed_length:
        log_mel = np.pad(log_mel, ((0,0),(0, fixed_length - log_mel.shape[1])),
                         constant_values=log_mel.min())
    else:
        log_mel = log_mel[:, :fixed_length]
    lo, hi = log_mel.min(), log_mel.max()
    if hi - lo > 1e-6:
        log_mel = (log_mel - lo) / (hi - lo)
    return log_mel.astype(np.float32)

# ── Metadata ──
def load_metadata(root):
    return pd.read_csv(os.path.join(root, 'metadata', 'UrbanSound8K.csv'))

def audio_path(root, fold, fname):
    return os.path.join(root, 'audio', f'fold{fold}', fname)

def get_fold_splits(meta):
    for fold in sorted(meta['fold'].unique()):
        yield fold, meta[meta['fold'] != fold].copy(), meta[meta['fold'] == fold].copy()

print('Core utilities loaded.')

In [None]:
# ── Cell 6: Dataset + Models ─────────────────────────────────────

class MelDataset(Dataset):
    def __init__(self, df, root, augment=False):
        self.df = df.reset_index(drop=True)
        self.root = root
        self.augment = augment

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        y = load_audio(audio_path(self.root, row['fold'], row['slice_file_name']))
        if self.augment:
            shift = int(np.random.uniform(-0.1, 0.1) * len(y))
            y = np.roll(y, shift)
            y += np.random.normal(0, 0.005, y.shape).astype(y.dtype)
        mel = torch.from_numpy(extract_mel(y)).unsqueeze(0)
        return mel, torch.tensor(int(row['classID']), dtype=torch.long)


class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, pool=True):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.MaxPool2d(2) if pool else nn.Identity()
        )
    def forward(self, x): return self.block(x)


class UrbanCNN(nn.Module):
    def __init__(self, n_classes=10, dropout=0.5):
        super().__init__()
        self.features = nn.Sequential(
            ConvBlock(1,   32),
            ConvBlock(32,  64),
            ConvBlock(64,  128),
            ConvBlock(128, 256),
        )
        self.head = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256*8*8, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, n_classes)
        )
    def forward(self, x): return self.head(self.features(x))


print('Dataset and model classes defined.')

In [None]:
# ── Cell 7: Config ───────────────────────────────────────────────
# Adjust these based on your GPU
# T4:   BATCH_SIZE=64,  NUM_WORKERS=2
# V100: BATCH_SIZE=128, NUM_WORKERS=4
# A100: BATCH_SIZE=256, NUM_WORKERS=4

BATCH_SIZE  = 64     # ← change based on GPU above
EPOCHS      = 60
LR          = 1e-3
WEIGHT_DECAY = 1e-4
NUM_WORKERS = 2      # ← change based on GPU above

PROGRESS_FILE = os.path.join(CHECKPOINT_DIR, 'progress.json')

print(f'Batch size: {BATCH_SIZE}')
print(f'Epochs: {EPOCHS}')
print(f'Progress file: {PROGRESS_FILE}')

In [None]:
# ── Cell 8: Training helpers ─────────────────────────────────────
from sklearn.metrics import accuracy_score

def train_epoch(model, loader, opt, criterion, scaler=None):
    model.train()
    total_loss = correct = total = 0
    for x, y in loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        opt.zero_grad()
        if scaler:
            with torch.cuda.amp.autocast():
                out = model(x); loss = criterion(out, y)
            scaler.scale(loss).backward(); scaler.step(opt); scaler.update()
        else:
            out = model(x); loss = criterion(out, y)
            loss.backward(); opt.step()
        total_loss += loss.item() * len(y)
        correct += (out.argmax(1) == y).sum().item()
        total += len(y)
    return total_loss / total, correct / total

@torch.no_grad()
def eval_model(model, loader):
    model.eval()
    preds, labels = [], []
    for x, y in loader:
        preds.extend(model(x.to(DEVICE)).argmax(1).cpu().tolist())
        labels.extend(y.tolist())
    return np.array(labels), np.array(preds)

def load_progress():
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE) as f: return json.load(f)
    return {'completed_folds': [], 'fold_results': []}

def save_progress(prog):
    with open(PROGRESS_FILE, 'w') as f: json.dump(prog, f, indent=2)

print('Training helpers defined.')

In [None]:
# ── Cell 9: SVM Baseline ─────────────────────────────────────────
# Run this once — takes ~15-30 min on Colab CPU
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

SVM_RESULTS_FILE = os.path.join(RESULTS_DIR, 'svm_results.json')

if os.path.exists(SVM_RESULTS_FILE):
    print('SVM results already exist. Loading from Drive...')
    with open(SVM_RESULTS_FILE) as f:
        svm_summary = json.load(f)
    print(f"SVM Mean Accuracy: {svm_summary['summary']['mean_accuracy']*100:.2f}%")
else:
    meta = load_metadata(DATASET_ROOT)

    # Pre-compute MFCC
    mfcc_cache = os.path.join(CACHE_DIR, 'mfcc.npz')
    if os.path.exists(mfcc_cache):
        data = np.load(mfcc_cache)
        X_all, y_all, folds_all = data['X'], data['y'], data['folds']
        print('Loaded MFCC cache.')
    else:
        X_all, y_all, folds_all = [], [], []
        for i, row in meta.iterrows():
            path = audio_path(DATASET_ROOT, row['fold'], row['slice_file_name'])
            audio = load_audio(path)
            X_all.append(extract_mfcc(audio))
            y_all.append(row['classID'])
            folds_all.append(row['fold'])
            if (i+1) % 1000 == 0: print(f'  MFCC {i+1}/{len(meta)}')
        X_all = np.array(X_all, dtype=np.float32)
        y_all = np.array(y_all, dtype=np.int64)
        folds_all = np.array(folds_all)
        np.savez(mfcc_cache, X=X_all, y=y_all, folds=folds_all)
        print('MFCC cache saved.')

    svm_fold_results = []
    for fold, train_df, test_df in get_fold_splits(meta):
        clf = Pipeline([('sc', StandardScaler()),
                         ('svm', SVC(kernel='rbf', C=10, gamma='scale', cache_size=1000))])
        tm = folds_all != fold
        te = folds_all == fold
        print(f'  SVM fold {fold}...')
        clf.fit(X_all[tm], y_all[tm])
        preds = clf.predict(X_all[te])
        acc = accuracy_score(y_all[te], preds)
        svm_fold_results.append({'fold': int(fold), 'accuracy': float(acc)})
        print(f'  Fold {fold}: {acc*100:.2f}%')

    accs = [r['accuracy'] for r in svm_fold_results]
    svm_summary = {
        'model': 'SVM+MFCC',
        'summary': {'mean_accuracy': float(np.mean(accs)), 'std_accuracy': float(np.std(accs))},
        'folds': svm_fold_results
    }
    with open(SVM_RESULTS_FILE, 'w') as f: json.dump(svm_summary, f, indent=2)
    print(f"\nSVM Mean: {np.mean(accs)*100:.2f}% ± {np.std(accs)*100:.2f}%")

In [None]:
# ── Cell 10: CNN Training (multi-session) ────────────────────────
# Safe to re-run: skips completed folds automatically

meta = load_metadata(DATASET_ROOT)
prog = load_progress()
completed = set(prog['completed_folds'])
fold_results = prog['fold_results']

scaler = torch.cuda.amp.GradScaler() if DEVICE.type == 'cuda' else None

for test_fold, train_df, test_df in get_fold_splits(meta):
    if test_fold in completed:
        matching = [r for r in fold_results if r.get('fold') == test_fold]
        if matching:
            print(f'[SKIP] Fold {test_fold} — {matching[0]["accuracy"]*100:.2f}%')
        continue

    print(f'\n── Fold {test_fold} ── {len(train_df)} train / {len(test_df)} test')

    ckpt_best   = os.path.join(CHECKPOINT_DIR, f'fold{test_fold}_best.pt')
    ckpt_resume = os.path.join(CHECKPOINT_DIR, f'fold{test_fold}_resume.pt')

    train_ds = MelDataset(train_df, DATASET_ROOT, augment=True)
    test_ds  = MelDataset(test_df,  DATASET_ROOT, augment=False)
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                               num_workers=NUM_WORKERS, pin_memory=True)
    test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                               num_workers=NUM_WORKERS, pin_memory=True)

    model = UrbanCNN().to(DEVICE)
    opt = torch.optim.Adam(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
    sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=EPOCHS)
    crit = nn.CrossEntropyLoss()

    start_epoch = 1
    best_acc = 0.0

    if os.path.exists(ckpt_resume):
        state = torch.load(ckpt_resume, map_location=DEVICE)
        model.load_state_dict(state['model'])
        opt.load_state_dict(state['optimizer'])
        sched.load_state_dict(state['scheduler'])
        start_epoch = state['epoch'] + 1
        best_acc = state['best_acc']
        print(f'Resumed from epoch {start_epoch}, best_acc={best_acc*100:.2f}%')

    for epoch in range(start_epoch, EPOCHS + 1):
        loss, train_acc = train_epoch(model, train_loader, opt, crit, scaler)
        sched.step()

        if epoch % 5 == 0 or epoch == EPOCHS:
            # Save resume state to Drive
            torch.save({'epoch': epoch, 'model': model.state_dict(),
                         'optimizer': opt.state_dict(), 'scheduler': sched.state_dict(),
                         'best_acc': best_acc}, ckpt_resume)

            labels, preds = eval_model(model, test_loader)
            val_acc = (labels == preds).mean()
            print(f'  E{epoch:02d}/{EPOCHS}  loss={loss:.4f}  '
                  f'train={train_acc*100:.1f}%  val={val_acc*100:.1f}%')

            if val_acc > best_acc:
                best_acc = val_acc
                torch.save(model.state_dict(), ckpt_best)

    # Final eval
    model.load_state_dict(torch.load(ckpt_best, map_location=DEVICE))
    labels, preds = eval_model(model, test_loader)
    acc = float((labels == preds).mean())

    fold_results.append({'fold': int(test_fold), 'accuracy': acc})
    completed.add(test_fold)
    prog['completed_folds'] = list(completed)
    prog['fold_results'] = fold_results
    save_progress(prog)

    if os.path.exists(ckpt_resume): os.remove(ckpt_resume)
    torch.cuda.empty_cache()
    print(f'  Fold {test_fold} DONE — {acc*100:.2f}%')

print('\n── All folds complete ──')

In [None]:
# ── Cell 11: Final summary & comparison ─────────────────────────
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# CNN results
prog = load_progress()
cnn_accs = [r['accuracy'] for r in prog['fold_results']]
print('CNN Results:')
for r in sorted(prog['fold_results'], key=lambda x: x['fold']):
    print(f'  Fold {r["fold"]:2d}: {r["accuracy"]*100:.2f}%')
print(f'  Mean: {np.mean(cnn_accs)*100:.2f}% ± {np.std(cnn_accs)*100:.2f}%')

# SVM results
if os.path.exists(SVM_RESULTS_FILE):
    with open(SVM_RESULTS_FILE) as f: svm = json.load(f)
    svm_accs = [r['accuracy'] for r in svm['folds']]
    print(f"\nSVM Baseline: {np.mean(svm_accs)*100:.2f}% ± {np.std(svm_accs)*100:.2f}%")

# Save CNN results
cnn_summary = {
    'model': 'UrbanCNN',
    'summary': {'mean_accuracy': float(np.mean(cnn_accs)), 'std_accuracy': float(np.std(cnn_accs))},
    'folds': prog['fold_results']
}
with open(os.path.join(RESULTS_DIR, 'cnn_results.json'), 'w') as f:
    json.dump(cnn_summary, f, indent=2)
print('\nResults saved to Drive.')

In [None]:
# ── Cell 12: Confusion matrix (run after all folds complete) ─────
# Re-runs inference on fold 10 for confusion matrix visualization
# You can pick any fold, or aggregate all

meta = load_metadata(DATASET_ROOT)
all_labels, all_preds = [], []

for test_fold, train_df, test_df in get_fold_splits(meta):
    ckpt = os.path.join(CHECKPOINT_DIR, f'fold{test_fold}_best.pt')
    if not os.path.exists(ckpt):
        print(f'Missing checkpoint for fold {test_fold}')
        continue

    test_ds = MelDataset(test_df, DATASET_ROOT, augment=False)
    test_loader = DataLoader(test_ds, batch_size=64, shuffle=False,
                              num_workers=2, pin_memory=True)

    model = UrbanCNN().to(DEVICE)
    model.load_state_dict(torch.load(ckpt, map_location=DEVICE))

    labels, preds = eval_model(model, test_loader)
    all_labels.extend(labels.tolist())
    all_preds.extend(preds.tolist())
    torch.cuda.empty_cache()

print(f'Overall accuracy: {accuracy_score(all_labels, all_preds)*100:.2f}%')

# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
fig, ax = plt.subplots(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=CLASSES, yticklabels=CLASSES, ax=ax)
ax.set_xlabel('Predicted'); ax.set_ylabel('True')
ax.set_title('Confusion Matrix — UrbanCNN (all folds)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(os.path.join(RESULTS_DIR, 'confusion_matrix.png'), dpi=150)
plt.show()
print('Saved to Drive.')