# GTZAN Genre Classification – CNN with Attention (Preprocess + Train)
This notebook performs the full preprocessing of the GTZAN audio files (log‑Mel spectrogram + optional chromagram), creates train/validation splits, and then trains a simple CNN with a CBAM (Convolutional Block Attention Module) block.
All required packages are installed in the first cell, so you can run the notebook on a fresh environment.


In [None]:
# Install required packages (run once)
!pip install -q numpy librosa scikit-image torch torchvision tqdm scikit-learn


In [None]:
import os, glob, numpy as np
import librosa
from skimage.transform import resize
from sklearn.model_selection import train_test_split
import torch, torch.nn as nn, torch.nn.functional as F, torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm


In [None]:
# ---------------------------------------------------------------------------
# Configuration – adjust paths if your folder layout differs
# ---------------------------------------------------------------------------
DATASET_ROOT = '/Users/narac0503/GIT/GTZAN Dataset Classification/GTZAN-Dataset-Classification/gtzan-classification/data/gtzan/genres_original'
OUTPUT_DIR   = '/Users/narac0503/GIT/GTZAN Dataset Classification/GTZAN-Dataset-Classification/gtzan-classification/data/preprocessed'
SAMPLE_RATE  = 22050
DURATION     = 30.0  # seconds per clip
N_MELS       = 128
HOP_LENGTH   = 512
IMG_SIZE     = (128, 128)  # (freq, time) after resizing
USE_CHROMA   = True  # set False to use only log‑Mel
os.makedirs(OUTPUT_DIR, exist_ok=True)


In [None]:
def load_audio(path: str, sr: int = SAMPLE_RATE, duration: float = DURATION):
    y, _ = librosa.load(path, sr=sr, duration=duration)
    expected_len = int(sr * duration)
    if len(y) < expected_len:
        y = np.pad(y, (0, expected_len - len(y)))
    else:
        y = y[:expected_len]
    return y

def log_mel_spectrogram(y: np.ndarray, sr: int = SAMPLE_RATE):
    S = librosa.feature.melspectrogram(y, sr=sr, n_mels=N_MELS, hop_length=HOP_LENGTH)
    log_S = librosa.power_to_db(S, ref=np.max)
    return log_S

def chromagram(y: np.ndarray, sr: int = SAMPLE_RATE):
    C = librosa.feature.chroma_stft(y=y, sr=sr, hop_length=HOP_LENGTH)
    # Resize to same frequency dimension as mel (N_MELS)
    C_resized = resize(C, (N_MELS, C.shape[1]), order=1, mode='constant', anti_aliasing=True)
    return C_resized

def preprocess_file(path: str):
    y = load_audio(path)
    mel = log_mel_spectrogram(y)
    mel_resized = resize(mel, IMG_SIZE, order=1, mode='constant', anti_aliasing=True)
    if USE_CHROMA:
        chroma = chromagram(y)
        chroma_resized = resize(chroma, IMG_SIZE, order=1, mode='constant', anti_aliasing=True)
        img = np.stack([mel_resized, chroma_resized], axis=0)  # (C, H, W)
    else:
        img = mel_resized[np.newaxis, ...]
    img = (img - img.mean()) / (img.std() + 1e-6)
    return img.astype(np.float32)


In [None]:
# ---------------------------------------------------------------------------
# Load all audio files, compute spectrograms, and split into train/val
# ---------------------------------------------------------------------------
genres = sorted([d for d in os.listdir(DATASET_ROOT) if os.path.isdir(os.path.join(DATASET_ROOT, d))])
label_map = {g: i for i, g in enumerate(genres)}
print('Found genres:', label_map)

X, y = [], []
for genre in genres:
    pattern = os.path.join(DATASET_ROOT, genre, '*.*')
    for fp in glob.glob(pattern):
        try:
            img = preprocess_file(fp)
            X.append(img)
            y.append(label_map[genre])
        except Exception as e:
            print(f'[WARN] {fp}: {e}')

X = np.stack(X)  # shape (N, C, H, W)
y = np.array(y, dtype=np.int64)
print(f'Processed {X.shape[0]} files – shape {X.shape}')

# Stratified train/val split (80/20)
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42)

# Save to disk for later reuse
np.savez_compressed(os.path.join(OUTPUT_DIR, 'train.npz'), X=X_train, y=y_train)
np.savez_compressed(os.path.join(OUTPUT_DIR, 'val.npz'),   X=X_val,   y=y_val)
print('Saved train.npz and val.npz to', OUTPUT_DIR)


In [None]:
# ---------------------------------------------------------------------------
# CBAM (Channel + Spatial Attention) implementation
# ---------------------------------------------------------------------------
class CBAM(nn.Module):
    def __init__(self, channels, reduction=16):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.mlp = nn.Sequential(
            nn.Conv2d(channels, channels // reduction, kernel_size=1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels // reduction, channels, kernel_size=1, bias=False)
        )
        self.sigmoid = nn.Sigmoid()
        self.conv_spatial = nn.Conv2d(2, 1, kernel_size=7, padding=3, bias=False)
    def forward(self, x):
        avg_out = self.mlp(self.avg_pool(x))
        max_out = self.mlp(self.max_pool(x))
        scale = self.sigmoid(avg_out + max_out)
        x = x * scale
        avg_pool = torch.mean(x, dim=1, keepdim=True)
        max_pool, _ = torch.max(x, dim=1, keepdim=True)
        concat = torch.cat([avg_pool, max_pool], dim=1)
        scale_spatial = self.sigmoid(self.conv_spatial(concat))
        out = x * scale_spatial
        return out


In [None]:
# ---------------------------------------------------------------------------
# Simple CNN model that uses CBAM
# ---------------------------------------------------------------------------
class SimpleCNN(nn.Module):
    def __init__(self, in_channels, num_classes, img_size=(128,128)):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3, padding=1)
        self.bn1   = nn.BatchNorm2d(32)
        self.cbam1 = CBAM(32)
        self.pool  = nn.MaxPool2d(2)
        h, w = img_size
        self.flat_dim = 32 * (h//2) * (w//2)
        self.fc = nn.Linear(self.flat_dim, num_classes)
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.cbam1(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [None]:
# ---------------------------------------------------------------------------
# Prepare DataLoaders from the in‑memory arrays
# ---------------------------------------------------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_classes = len(np.unique(y_train))
in_channels = X_train.shape[1]  # 1 or 2 depending on USE_CHROMA
model = SimpleCNN(in_channels=in_channels, num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
batch_size = 32
train_ds = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
val_ds   = TensorDataset(torch.from_numpy(X_val),   torch.from_numpy(y_val))
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, pin_memory=True)


In [None]:
def train_one_epoch(loader, model, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for xb, yb in tqdm(loader, leave=False):
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        outputs = model(xb)
        loss = criterion(outputs, yb)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * xb.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == yb).sum().item()
        total += xb.size(0)
    return running_loss / total, correct / total

def evaluate(loader, model, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            outputs = model(xb)
            loss = criterion(outputs, yb)
            running_loss += loss.item() * xb.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == yb).sum().item()
            total += xb.size(0)
    return running_loss / total, correct / total


In [None]:
# ---------------------------------------------------------------------------
# Training loop
# ---------------------------------------------------------------------------
best_val_acc = 0.0
num_epochs = 30
for epoch in range(1, num_epochs+1):
    train_loss, train_acc = train_one_epoch(train_loader, model, criterion, optimizer)
    val_loss,   val_acc   = evaluate(val_loader, model, criterion)
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'best_cnn_attention.pth')
    print(f'Epoch {epoch:02d} | Train loss {train_loss:.4f} acc {train_acc:.4f} | Val loss {val_loss:.4f} acc {val_acc:.4f}')

print('Training complete – best validation accuracy:', best_val_acc)


In [None]:
# Load best model and report final validation metrics
model.load_state_dict(torch.load('best_cnn_attention.pth'))
val_loss, val_acc = evaluate(val_loader, model, criterion)
print(f'Final validation loss: {val_loss:.4f}, accuracy: {val_acc:.4f}')
