
### Composer Classification — LSTM + CNN/Hybrid (Balanced Test, Merged)


In [None]:
# 2
import os, random
from pathlib import Path
import numpy as np
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import pickle

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)
device = torch.device("mps" if torch.backends.mps.is_available()
                      else "cuda" if torch.cuda.is_available()
                      else "cpu")
print("Using device:", device)

Using device: mps


In [None]:
# 3
# Force Beethoven test set to EXACTLY 4 files (others already have 4).
BASE_DATA = Path("../data")
RAW_BEET  = BASE_DATA / "raw_Beethoven"
LABEL     = "beethoven"

all_beet = sorted(RAW_BEET.rglob("*.mid"))
print(f"Found {len(all_beet)} raw Beethoven files in {RAW_BEET}")

# Pick exactly 4 for TEST (or fewer if insufficient files)
random.seed(42)
k_test = min(4, len(all_beet))
test_files = random.sample(all_beet, k=k_test)
remaining  = [p for p in all_beet if p not in test_files]

# Split remaining -> train/dev (70/30)
train_files, dev_files = train_test_split(remaining, test_size=0.30, random_state=42)

def reset_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)
    # Remove only MIDI files (don’t nuke other assets)
    for ext in ("*.mid", "*.midi"):
        for old in p.glob(ext):
            old.unlink()

t_train = BASE_DATA / "train" / LABEL
t_dev   = BASE_DATA / "dev"   / LABEL
t_test  = BASE_DATA / "test"  / LABEL

for d in (t_train, t_dev, t_test):
    reset_dir(d)

import shutil
for f in train_files: shutil.copy2(f, t_train / f.name)
for f in dev_files:   shutil.copy2(f,   t_dev / f.name)
for f in test_files:  shutil.copy2(f,  t_test / f.name)

print(f"Beethoven placed -> train:{len(train_files)}  dev:{len(dev_files)}  test:{len(test_files)}")

for c in ["bach","beethoven","chopin","mozart"]:
    n = sum(1 for _ in (BASE_DATA/"test"/c).rglob("*.mid")) \
      + sum(1 for _ in (BASE_DATA/"test"/c).rglob("*.midi"))
    print(f"test/{c}: {n}")

Found 215 raw Beethoven files in ../data/raw_Beethoven
Beethoven placed -> train:147  dev:64  test:4
test/bach: 4
test/beethoven: 4
test/chopin: 4
test/mozart: 4


In [None]:
# 4
PROC = Path("../data/processed_data")
PROC.mkdir(parents=True, exist_ok=True)

to_delete = [
    PROC/"lstm_train.pkl", PROC/"lstm_dev.pkl", PROC/"lstm_test.pkl",
    PROC/"label_encoder.pkl"
]
deleted = 0
for p in to_delete:
    if p.exists():
        p.unlink()
        deleted += 1
print(f"Deleted {deleted} old processed files. You’ll rebuild them next.")

Deleted 4 old processed files. You’ll rebuild them next.


In [None]:
# 5
# Install pretty_midi
try:
    import pretty_midi
except ImportError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pretty_midi"])
    import pretty_midi
    



In [None]:
# 6
# Build LSTM windows (50x3: pitch, step, duration)
from sklearn.preprocessing import LabelEncoder

BASE = Path("../data")
PROC = BASE / "processed_data"
PROC.mkdir(parents=True, exist_ok=True)

LABELS = ["bach","beethoven","chopin","mozart"]  
le = LabelEncoder()
le.fit(LABELS)

WINDOW_LEN = 50
STRIDE     = 1  

def midi_to_events(fp):
    """
    Returns an array of shape (N, 3): [pitch, step, duration] in seconds.
    step = time since previous note onset; duration = note end - start.
    """
    try:
        pm = pretty_midi.PrettyMIDI(str(fp))
        notes = []
        for inst in pm.instruments:
            for n in inst.notes:
                notes.append((n.start, n.end, n.pitch))
        if not notes:
            return None
        notes.sort(key=lambda x: x[0])  
        events = []
        prev_onset = notes[0][0]
        for start, end, pitch in notes:
            step = float(max(0.0, start - prev_onset))
            dur  = float(max(0.0, end - start))
            events.append([float(pitch), step, dur])
            prev_onset = start
        return np.array(events, dtype=np.float32)
    except Exception:
        return None  # skip unreadable files

def windows_from_events(ev, win=WINDOW_LEN, stride=STRIDE):
    """
    Slice (N,3) events into (num_windows, win, 3) using a sliding window.
    Skip files with fewer than 'win' notes.
    """
    if ev is None or len(ev) < win:
        return np.empty((0, win, 3), dtype=np.float32)
    n = 1 + (len(ev) - win) // stride
    out = np.empty((n, win, 3), dtype=np.float32)
    j = 0
    for i in range(0, len(ev) - win + 1, stride):
        out[j] = ev[i:i+win]
        j += 1
    return out

def build_split(split):
    X_blocks = []
    y_blocks = []
    for label in LABELS:
        folder = BASE / split / label
        files = list(folder.rglob("*.mid")) + list(folder.rglob("*.midi"))
        for fp in files:
            ev = midi_to_events(fp)
            Xw = windows_from_events(ev, WINDOW_LEN, STRIDE)
            if Xw.shape[0] > 0:
                X_blocks.append(Xw)
                y_blocks.append(
                    np.full((Xw.shape[0],), le.transform([label])[0], dtype=np.int64)
                )
    if X_blocks:
        X = np.concatenate(X_blocks, axis=0)
        y = np.concatenate(y_blocks, axis=0)
    else:
        X = np.empty((0, WINDOW_LEN, 3), dtype=np.float32)
        y = np.empty((0,), dtype=np.int64)
    return X, y

X_train, y_train = build_split("train")
X_dev,   y_dev   = build_split("dev")
X_test,  y_test  = build_split("test")

print("Built windows:",
      "Train", X_train.shape, "Dev", X_dev.shape, "Test", X_test.shape)

with open(PROC/"lstm_train.pkl","wb") as f: pickle.dump((X_train, y_train), f)
with open(PROC/"lstm_dev.pkl","wb")   as f: pickle.dump((X_dev,   y_dev),   f)
with open(PROC/"lstm_test.pkl","wb")  as f: pickle.dump((X_test,  y_test),  f)
with open(PROC/"label_encoder.pkl","wb") as f: pickle.dump(le, f)

for name, y in [("Train", y_train), ("Dev", y_dev), ("Test", y_test)]:
    cnt = Counter(y.tolist())
    print(name, {LABELS[i]: cnt.get(i, 0) for i in range(len(LABELS))})



Built windows: Train (1529502, 50, 3) Dev (576863, 50, 3) Test (56063, 50, 3)
Train {'bach': 85611, 'beethoven': 1049405, 'chopin': 104020, 'mozart': 290466}
Dev {'bach': 11015, 'beethoven': 535829, 'chopin': 9309, 'mozart': 20710}
Test {'bach': 11313, 'beethoven': 16768, 'chopin': 9581, 'mozart': 18401}


In [None]:
# 7
PROC = Path("../data/processed_data")
with open(PROC/"lstm_train.pkl","rb") as f:
    X_train, y_train = pickle.load(f)
with open(PROC/"lstm_dev.pkl","rb") as f:
    X_dev,   y_dev   = pickle.load(f)
with open(PROC/"lstm_test.pkl","rb") as f:
    X_test,  y_test  = pickle.load(f)
with open(PROC/"label_encoder.pkl","rb") as f:
    le = pickle.load(f)

num_classes = len(le.classes_)
input_size  = X_train.shape[2]

print("Shapes → Train/Dev/Test:", X_train.shape, X_dev.shape, X_test.shape)
print("Classes:", le.classes_)

# Show counts
def show_counts(name, y):
    cnt = Counter(y.tolist())
    print(name, {le.classes_[i]: cnt.get(i,0) for i in range(num_classes)})

show_counts("Train", y_train)
show_counts("Dev",   y_dev)
show_counts("Test (before balance)", y_test)

# HARD STOP if any class missing in TEST
test_cnt = Counter(y_test.tolist())
missing  = [le.classes_[i] for i in range(num_classes) if test_cnt.get(i,0) == 0]
if missing:
    raise RuntimeError(
        "Processed TEST set is missing classes: "
        f"{missing}. Re-run your preprocessing AFTER the Beethoven split."
    )

# In-memory balance: downsample each class to the minimum count
rng = np.random.default_rng(42)
target_n = min(test_cnt[i] for i in range(num_classes))

keep_idx = []
for i in range(num_classes):
    idx_i = np.where(y_test == i)[0]
    pick  = rng.choice(idx_i, size=target_n, replace=False)
    keep_idx.extend(pick)
keep_idx = np.array(keep_idx)

X_test_bal = X_test[keep_idx]
y_test_bal = y_test[keep_idx]
show_counts("Test (balanced in memory)", y_test_bal)

Shapes → Train/Dev/Test: (1529502, 50, 3) (576863, 50, 3) (56063, 50, 3)
Classes: ['bach' 'beethoven' 'chopin' 'mozart']
Train {'bach': 85611, 'beethoven': 1049405, 'chopin': 104020, 'mozart': 290466}
Dev {'bach': 11015, 'beethoven': 535829, 'chopin': 9309, 'mozart': 20710}
Test (before balance) {'bach': 11313, 'beethoven': 16768, 'chopin': 9581, 'mozart': 18401}
Test (balanced in memory) {'bach': 9581, 'beethoven': 9581, 'chopin': 9581, 'mozart': 9581}


In [18]:
# 8
# Convert to tensors
X_tr  = torch.tensor(X_train, dtype=torch.float32, device=device)
y_tr  = torch.tensor(y_train, dtype=torch.long,    device=device)
X_val = torch.tensor(X_dev,   dtype=torch.float32, device=device)
y_val = torch.tensor(y_dev,   dtype=torch.long,    device=device)

# balanced test tensors
X_te  = torch.tensor(X_test_bal, dtype=torch.float32, device=device)
y_te  = torch.tensor(y_test_bal, dtype=torch.long,    device=device)

# Loaders
lstm_batch_size = 32
lstm_train_loader = DataLoader(TensorDataset(X_tr, y_tr), batch_size=lstm_batch_size, shuffle=True)
lstm_dev_loader   = DataLoader(TensorDataset(X_val, y_val), batch_size=lstm_batch_size, shuffle=False)
lstm_test_loader  = DataLoader(TensorDataset(X_te, y_te),   batch_size=lstm_batch_size, shuffle=False)

xb, yb = next(iter(lstm_train_loader))
print("LSTM batch X:", xb.shape, " y:", yb.shape)

LSTM batch X: torch.Size([32, 50, 3])  y: torch.Size([32])


In [19]:
# 9
# LSTM model
class MusicLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.3
        )
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        last = out[:, -1, :]
        return self.fc(last)

hidden_size = 128
num_layers  = 2
lstm_model  = MusicLSTM(input_size, hidden_size, num_layers, num_classes).to(device)
print(lstm_model)

MusicLSTM(
  (lstm): LSTM(3, 128, num_layers=2, batch_first=True, dropout=0.3)
  (fc): Linear(in_features=128, out_features=4, bias=True)
)


In [22]:
# 10
# LSTM training setup
import time
lstm_lr     = 1e-3
lstm_epochs = 20  # you can lower while iterating

# Early stopping (on dev accuracy)
es_patience  = 3          # stop after 3 epochs without improvement
es_min_delta = 1e-4       # require at least this improvement
es_wait      = 0

lstm_criterion = nn.CrossEntropyLoss()
lstm_optimizer = optim.Adam(lstm_model.parameters(), lr=lstm_lr)
lstm_scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    lstm_optimizer, mode="max", factor=0.5, patience=2
)

best_dev_acc = -float("inf")
print("Training LSTM for", lstm_epochs, "epochs (early stopping enabled)")

Training LSTM for 20 epochs (early stopping enabled)


In [23]:
# 11
#LSTM Training Loop
for epoch in range(1, lstm_epochs + 1):
    lstm_model.train()
    train_losses = []
    for Xb, yb in lstm_train_loader:
        lstm_optimizer.zero_grad()
        logits = lstm_model(Xb)
        loss   = lstm_criterion(logits, yb)
        loss.backward()
        lstm_optimizer.step()
        train_losses.append(loss.item())
    avg_train_loss = float(np.mean(train_losses)) if train_losses else 0.0

    # Dev
    lstm_model.eval()
    dev_preds, dev_true, dev_losses = [], [], []
    with torch.no_grad():
        for Xb, yb in lstm_dev_loader:
            logits = lstm_model(Xb)
            loss   = lstm_criterion(logits, yb)
            dev_losses.append(loss.item())
            dev_preds.extend(logits.argmax(1).cpu().numpy())
            dev_true.extend(yb.cpu().numpy())
    avg_dev_loss = float(np.mean(dev_losses)) if dev_losses else 0.0
    dev_acc      = accuracy_score(dev_true, dev_preds) if dev_true else 0.0
    lstm_scheduler.step(dev_acc)

    print(f"Epoch {epoch:2d}/{lstm_epochs}  Train Loss: {avg_train_loss:.4f}  "
          f"Dev Loss: {avg_dev_loss:.4f}  Dev Acc: {dev_acc:.4f}  "
          f"(best: {best_dev_acc if best_dev_acc!=-float('inf') else 0.0:.4f})")

    # Early stopping logic
    if dev_acc > best_dev_acc + es_min_delta:
        best_dev_acc = dev_acc
        es_wait = 0
        torch.save(lstm_model.state_dict(), PROC / "best_lstm.pth")
        print("  🔖 New best LSTM saved")
    else:
        es_wait += 1
        if es_wait >= es_patience:
            print(f"Early stopping after epoch {epoch} "
                  f"(no improvement for {es_patience} epochs).")
            break

Epoch  1/20  Train Loss: 0.1649  Dev Loss: 0.3852  Dev Acc: 0.8604  (best: 0.0000)
  🔖 New best LSTM saved
Epoch  2/20  Train Loss: 0.1805  Dev Loss: 0.3872  Dev Acc: 0.8547  (best: 0.8604)
Epoch  3/20  Train Loss: 0.2009  Dev Loss: 0.3722  Dev Acc: 0.8589  (best: 0.8604)
Epoch  4/20  Train Loss: 0.2217  Dev Loss: 0.3505  Dev Acc: 0.8676  (best: 0.8604)
  🔖 New best LSTM saved
Epoch  5/20  Train Loss: 0.2476  Dev Loss: 0.3944  Dev Acc: 0.8393  (best: 0.8676)
Epoch  6/20  Train Loss: 0.2952  Dev Loss: 0.3485  Dev Acc: 0.8616  (best: 0.8676)
Epoch  7/20  Train Loss: 0.4435  Dev Loss: 0.3478  Dev Acc: 0.8601  (best: 0.8676)
Early stopping after epoch 7 (no improvement for 3 epochs).


In [24]:
# 12
# Load the best LSTM model for evaluation
best_lstm = MusicLSTM(input_size, hidden_size, num_layers, num_classes).to(device)
best_lstm.load_state_dict(torch.load(PROC/"best_lstm.pth", map_location=device))
best_lstm.eval()

test_preds, test_true = [], []
with torch.no_grad():
    for Xb, yb in lstm_test_loader:
        logits = best_lstm(Xb)
        test_preds.extend(logits.argmax(1).cpu().numpy())
        test_true.extend(yb.cpu().numpy())

print(f"LSTM Test Accuracy: {accuracy_score(test_true, test_preds):.4f}\n")
print("LSTM Classification Report:\n",
      classification_report(test_true, test_preds, target_names=list(le.classes_), zero_division=0))
print("\nLSTM Confusion Matrix:\n", confusion_matrix(test_true, test_preds))

LSTM Test Accuracy: 0.7339

LSTM Classification Report:
               precision    recall  f1-score   support

        bach       0.93      0.88      0.91      9581
   beethoven       0.54      0.93      0.68      9581
      chopin       0.94      0.61      0.74      9581
      mozart       0.77      0.52      0.62      9581

    accuracy                           0.73     38324
   macro avg       0.79      0.73      0.74     38324
weighted avg       0.79      0.73      0.74     38324


LSTM Confusion Matrix:
 [[8469  464  277  371]
 [   0 8866   13  702]
 [ 610 2745 5857  369]
 [  45 4489  112 4935]]


In [25]:
# 13
# Convert sequence to pianoroll format
WINDOW_LEN  = X_train.shape[1]  # should be 50
PITCH_START = 21
PITCH_END   = 108
NUM_PITCHES = PITCH_END - PITCH_START + 1  # 88

def seq_to_pianoroll_fast(seq):
    """
    seq: (T, 3) where seq[:,0] = pitch
    returns: (1, 88, T) float32
    """
    T = seq.shape[0]
    pr = np.zeros((1, NUM_PITCHES, T), dtype=np.float32)
    pitches = seq[:, 0].astype(np.int16)
    idx = pitches - PITCH_START
    t_idx = np.arange(T, dtype=np.int32)
    mask = (idx >= 0) & (idx < NUM_PITCHES)
    pr[0, idx[mask], t_idx[mask]] = 1.0
    return pr

In [26]:
# 14
_ = seq_to_pianoroll_fast(X_train[0])
print("Sanity OK; piano-rolls will be built on-the-fly per batch.")

Sanity OK; piano-rolls will be built on-the-fly per batch.


In [27]:
# 15
# Hybrid dataset and Data Loaders for LSTM + CNN
class HybridDataset(Dataset):
    def __init__(self, lstm_windows: np.ndarray, labels: np.ndarray):
        self.lstm_windows = lstm_windows  # numpy (N, 50, 3)
        self.labels       = labels        # numpy (N,)
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        seq = self.lstm_windows[idx]                   # (50, 3) numpy
        pr  = seq_to_pianoroll_fast(seq)               # (1, 88, 50) numpy
        x_l = torch.from_numpy(seq).to(torch.float32)  # (50, 3)
        x_c = torch.from_numpy(pr).to(torch.float32)   # (1, 88, 50)
        y   = torch.tensor(self.labels[idx], dtype=torch.long)
        return x_l, x_c, y

hyb_train_ds = HybridDataset(X_train,    y_train)
hyb_dev_ds   = HybridDataset(X_dev,      y_dev)
hyb_test_ds  = HybridDataset(X_test_bal, y_test_bal)

# num_workers=0 is safer on macOS/MPS and avoids extra memory
hyb_batch = 32
hyb_train_loader = DataLoader(hyb_train_ds, batch_size=hyb_batch, shuffle=True,  num_workers=0)
hyb_dev_loader   = DataLoader(hyb_dev_ds,   batch_size=hyb_batch, shuffle=False, num_workers=0)
hyb_test_loader  = DataLoader(hyb_test_ds,  batch_size=hyb_batch, shuffle=False, num_workers=0)

xb_l, xb_c, yb = next(iter(hyb_train_loader))
print("Hybrid LSTM batch:", xb_l.shape, " CNN batch:", xb_c.shape, " y:", yb.shape)

Hybrid LSTM batch: torch.Size([32, 50, 3])  CNN batch: torch.Size([32, 1, 88, 50])  y: torch.Size([32])


In [28]:
# 16
# HybridNet model: LSTM + CNN
import torch.nn.functional as F
class HybridNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # CNN branch
        self.cnn_branch = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d((2,2)),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1))
        )
        # LSTM branch
        self.lstm_branch = nn.LSTM(
            input_size=3, hidden_size=64, num_layers=1, batch_first=True
        )
        self.fc = nn.Linear(32 + 64, num_classes)

    def forward(self, x_lstm, x_cnn):
        out_l, _ = self.lstm_branch(x_lstm)
        feat_l   = out_l[:, -1, :]
        feat_c   = self.cnn_branch(x_cnn).view(x_cnn.size(0), -1)
        return self.fc(torch.cat([feat_l, feat_c], dim=1))

hyb_model = HybridNet(num_classes).to(device)
print(hyb_model)

HybridNet(
  (cnn_branch): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): AdaptiveAvgPool2d(output_size=(1, 1))
  )
  (lstm_branch): LSTM(3, 64, batch_first=True)
  (fc): Linear(in_features=96, out_features=4, bias=True)
)


In [29]:
# 17
# Hybrid training setup

hyb_lr     = 1e-3
hyb_epochs = 20  # lower while iterating if needed

# Early stopping (on dev accuracy)
hyb_es_patience  = 3        # stop after 3 epochs without improvement
hyb_es_min_delta = 1e-4     # require at least this improvement
hyb_es_wait      = 0

hyb_criterion = nn.CrossEntropyLoss()
hyb_optimizer = optim.Adam(hyb_model.parameters(), lr=hyb_lr)
hyb_scheduler = nn.Sequential(
    # use ReduceLROnPlateau via a tiny wrapper so we can keep the name variable
)
# Use the standard ReduceLROnPlateau directly:
hyb_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    hyb_optimizer, mode="max", factor=0.5, patience=2
)

best_hyb_dev_acc = -float("inf")
print("Training Hybrid for", hyb_epochs, "epochs (early stopping enabled)")

Training Hybrid for 20 epochs (early stopping enabled)


In [30]:
# 18
# Hybrid Training Loop
for epoch in range(1, hyb_epochs + 1):
    hyb_model.train()
    train_losses = []
    for x_lstm, x_cnn, yb in hyb_train_loader:
        x_lstm, x_cnn, yb = x_lstm.to(device), x_cnn.to(device), yb.to(device)
        hyb_optimizer.zero_grad()
        logits = hyb_model(x_lstm, x_cnn)
        loss   = hyb_criterion(logits, yb)
        loss.backward()
        hyb_optimizer.step()
        train_losses.append(loss.item())
    avg_train_loss = float(np.mean(train_losses)) if train_losses else 0.0

    # Dev
    hyb_model.eval()
    val_preds, val_true, val_losses = [], [], []
    with torch.no_grad():
        for x_lstm, x_cnn, yb in hyb_dev_loader:
            x_lstm, x_cnn, yb = x_lstm.to(device), x_cnn.to(device), yb.to(device)
            logits = hyb_model(x_lstm, x_cnn)
            loss   = hyb_criterion(logits, yb)
            val_losses.append(loss.item())
            val_preds.extend(logits.argmax(1).cpu().numpy())
            val_true.extend(yb.cpu().numpy())

    avg_val_loss = float(np.mean(val_losses)) if val_losses else 0.0
    val_acc      = accuracy_score(val_true, val_preds) if val_true else 0.0
    hyb_scheduler.step(val_acc)

    print(f"Epoch {epoch:2d}/{hyb_epochs}  Train Loss: {avg_train_loss:.4f}  "
          f"Val Loss: {avg_val_loss:.4f}  Val Acc: {val_acc:.4f}  "
          f"(best: {best_hyb_dev_acc if best_hyb_dev_acc!=-float('inf') else 0.0:.4f})")

    # Early stopping logic
    if val_acc > best_hyb_dev_acc + hyb_es_min_delta:
        best_hyb_dev_acc = val_acc
        hyb_es_wait = 0
        torch.save(hyb_model.state_dict(), PROC / "best_hybrid.pth")
        print("  🔖 New best Hybrid saved")
    else:
        hyb_es_wait += 1
        if hyb_es_wait >= hyb_es_patience:
            print(f"Early stopping after epoch {epoch} "
                  f"(no improvement for {hyb_es_patience} epochs).")
            break

Epoch  1/20  Train Loss: 0.5158  Val Loss: 0.3052  Val Acc: 0.8783  (best: 0.0000)
  🔖 New best Hybrid saved
Epoch  2/20  Train Loss: 0.3323  Val Loss: 0.3177  Val Acc: 0.8748  (best: 0.8783)
Epoch  3/20  Train Loss: 0.2677  Val Loss: 0.4006  Val Acc: 0.8257  (best: 0.8783)
Epoch  4/20  Train Loss: 0.2309  Val Loss: 0.4210  Val Acc: 0.8318  (best: 0.8783)
Early stopping after epoch 4 (no improvement for 3 epochs).


In [31]:
# 19
# Hybrid Test Evaluation
best_hyb = HybridNet(num_classes).to(device)
best_hyb.load_state_dict(torch.load(PROC/"best_hybrid.pth", map_location=device))
best_hyb.eval()

hyb_preds, hyb_true = [], []
with torch.no_grad():
    for x_lstm, x_cnn, yb in hyb_test_loader:
        x_lstm, x_cnn = x_lstm.to(device), x_cnn.to(device)
        logits = best_hyb(x_lstm, x_cnn)
        hyb_preds.extend(logits.argmax(1).cpu().numpy())
        hyb_true.extend(yb.numpy())

print(f"Hybrid Test Accuracy: {accuracy_score(hyb_true, hyb_preds):.4f}\n")
print("Hybrid Classification Report:\n",
      classification_report(hyb_true, hyb_preds, target_names=list(le.classes_), zero_division=0))
print("\nHybrid Confusion Matrix:\n", confusion_matrix(hyb_true, hyb_preds))

Hybrid Test Accuracy: 0.6894

Hybrid Classification Report:
               precision    recall  f1-score   support

        bach       0.96      0.88      0.92      9581
   beethoven       0.48      0.91      0.63      9581
      chopin       0.89      0.59      0.71      9581
      mozart       0.72      0.37      0.49      9581

    accuracy                           0.69     38324
   macro avg       0.76      0.69      0.69     38324
weighted avg       0.76      0.69      0.69     38324


Hybrid Confusion Matrix:
 [[8474  455  391  261]
 [   0 8708   26  847]
 [ 222 3411 5681  267]
 [  90 5680  255 3556]]
