In [1]:
import sys
from pathlib import Path

PROJECT_ROOT = Path.cwd().parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

PREPROCESSED_ROOT = PROJECT_ROOT / "preprocessed_dataset"

print("PREPROCESSED_ROOT:", PREPROCESSED_ROOT)
print("PROJECT_ROOT:", PROJECT_ROOT)

PREPROCESSED_ROOT: E:\DL_audiotomidi\preprocessed_dataset
PROJECT_ROOT: E:\DL_audiotomidi


In [4]:
import math
import random
import numpy as np

import torch
from torch import nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import LambdaLR

from models.cnn_amt import CNNTemporal
from scripts.dataset_helpers import create_dataloaders
from scripts.evaluate import evaluate, frame_level_f1

import wandb

import time

In [5]:
# SPEC_TYPE="mel" for Mel-Spectrogram
SPEC_TYPE = "cqt"
# N_FREQ_BINS=229 for Mel-Spectrogram
N_FREQ_BINS = 252
N_PITCHES = 88

BATCH_SIZE = 16
CHUNK_LEN = 1024
NUM_WORKERS = 4

NUM_EPOCHS = 100

BASE_LR = 1e-3
WEIGHT_DECAY = 1e-4
# WARMUP for Learning Rate
WARMUP_RATIO = 0.05

SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

HOP_LENGTH = 512
SR = 22050
# Binarization threshold
THRESHOLD = 0.5

In [6]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)

    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed(SEED)

In [7]:
train_loader, val_loader = create_dataloaders(
    root_dir=str(PREPROCESSED_ROOT),
    spec_type=SPEC_TYPE,
    batch_size=BATCH_SIZE,
    chunk_len=CHUNK_LEN,
    num_workers=NUM_WORKERS,
)

print("train batches:", len(train_loader))
print("val batches:", len(val_loader))

Train batches: 61
Val batches: 9


In [8]:
num_training_steps = NUM_EPOCHS * len(train_loader)
print("training steps:", num_training_steps)
num_warmup_steps = int(WARMUP_RATIO * num_training_steps)
print("warmup steps:",num_warmup_steps)

6100
305


In [9]:
def lr_lambda(current_step: int):
    # warmup
    if current_step < num_warmup_steps:
        return float(current_step) / max(1, num_warmup_steps)

    # cosine decay
    progress = float(current_step - num_warmup_steps) / max(
        1, num_training_steps - num_warmup_steps
    )
    return 0.5 * (1.0 + math.cos(math.pi * progress))

In [10]:
model = CNNTemporal(
    n_freq_bins=N_FREQ_BINS,
    n_pitches=N_PITCHES
)
model.to(DEVICE)

optimizer = AdamW(
    model.parameters(),
    lr=BASE_LR,
    weight_decay=WEIGHT_DECAY
)

scheduler = LambdaLR(optimizer, lr_lambda=lr_lambda)

model

CNNTemporal(
  (stem): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (cnn2d): Sequential(
    (0): PreActResBlock2D(
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (skip): Conv2d(32, 32, kernel_size=(1, 1), stride=(2, 1), bias=False)
    )
    (1): PreActResBlock2D(
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=Fals

In [11]:
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"trainable parameters: {trainable:,}")

Trainable parameters: 800,952


In [12]:
#loss function weighted with a positive weight value
pos_weight_value = 5.0

criterion = nn.BCEWithLogitsLoss(
    pos_weight=torch.full((N_PITCHES,), pos_weight_value, device=DEVICE)
)

In [10]:
def train_one_epoch(
    model: nn.Module,
    dataloader,
    criterion,
    optimizer,
    scheduler,
    device: torch.device,
    epoch: int,
    log_interval: int = 50
):
    model.train()
    running_loss = 0.0
    running_f1 = 0.0
    num_batches = 0

    for batch_idx, (spec, target) in enumerate(dataloader):

        spec = spec.to(device, non_blocking=True)
        target = target.to(device, non_blocking=True)

        optimizer.zero_grad(set_to_none=True)

        logits = model(spec)

        # BCEWithLogitsLoss by all (frame, pitch)
        loss = criterion(
            logits.reshape(-1, logits.shape[-1]),
            target.reshape(-1, target.shape[-1])
        )

        loss.backward()
        optimizer.step()
        scheduler.step()

        batch_f1 = frame_level_f1(logits.detach(), target.detach(), threshold=THRESHOLD)

        running_loss += loss.item()
        running_f1 += batch_f1
        num_batches += 1

    epoch_loss = running_loss / max(1, num_batches)
    epoch_f1 = running_f1 / max(1, num_batches)

    return epoch_loss, epoch_f1

In [11]:
def train(model,
          train_dataloader,
          val_dataloader,
          criterion,
          optimizer,
          scheduler,
          device,
          n_epochs,
          run=None,
          checkpoint_path=None,
          threshold=0.5,
          hop_length=512,
          sr=22050,
          log_interval=50,
):
    
    global_step = 0
    best_val_frame_f1 = 0.0

    total_start = time.time()
    
    for epoch in range(1, n_epochs + 1):

        epoch_start = time.time()

        # training phase
        train_loss, train_frame_f1 = train_one_epoch(
            model=model,
            dataloader=train_dataloader,
            criterion=criterion,
            optimizer=optimizer,
            scheduler=scheduler,
            device=device,
            epoch=epoch,
            log_interval=log_interval
        )
        
        train_time_sec = time.time() - epoch_start

        global_step = epoch * len(train_dataloader)

        # validation phase
        val_start = time.time()
        val_loss, val_frame_f1, _ = evaluate(
            model=model,
            dataloader=val_dataloader,
            criterion=criterion,
            device=device,
            threshold=threshold,
            hop_length=hop_length,
            sr=sr,
            onset_tolerance=0.05,
            compute_note_f1=False
        )

        val_time = time.time() - val_start
        epoch_time = time.time() - epoch_start

        print(
            f"[Epoch {epoch}/{n_epochs}] "
            f"train_loss={train_loss:.4f}, train_F1={train_frame_f1:.4f} | "
            f"val_loss={val_loss:.4f}, val_F1={val_frame_f1:.4f} | "
            f"train_time={train_time_sec:.1f}s"
        )

        current_lr = optimizer.param_groups[0]["lr"]

        #wandb logging
        if run is not None:
            run.log(
                {
                    "epoch": epoch,
                    "train/loss": train_loss,
                    "train/frame_F1": train_frame_f1,
                    "val/loss": val_loss,
                    "val/frame_F1": val_frame_f1,
                    "lr": current_lr,
                    "time/train_epoch_sec": epoch_time,
                },
                step=global_step
            )

        #saving the best checkpoint by val_frame metric
        if checkpoint_path is not None and val_frame_f1 > best_val_frame_f1:
            best_val_frame_f1 = val_frame_f1
            torch.save(
                {
                    "model_state_dict": model.state_dict()
                },
                checkpoint_path
            )
            print(f"*** New best frame-F1={best_val_frame_f1:.4f}, saved to {checkpoint_path} ***")

    total_time = time.time() - total_start
    print(f"\n=== TOTAL TRAINING TIME: {total_time/3600:.2f} hours ===")

    if run is not None:
        run.log({"time/total_hours": total_time / 3600.0}, step=global_step)

    return best_val_frame_f1

In [17]:
CHECKPOINT_DIR = PROJECT_ROOT / "checkpoints"
print(CHECKPOINT_DIR)

best_ckpt_path = CHECKPOINT_DIR / f"cnn_{SPEC_TYPE}_best.pt"

E:\DL_audiotomidi\checkpoints


In [18]:
wandb_config = {
    "model": "CNNTemporal",
    "spec_type": SPEC_TYPE,
    "n_freq_bins": N_FREQ_BINS,
    "n_pitches": N_PITCHES,
    "batch_size": BATCH_SIZE,
    "chunk_len": CHUNK_LEN,
    "num_workers": NUM_WORKERS,
    "num_epochs": NUM_EPOCHS,
    "base_lr": BASE_LR,
    "weight_decay": WEIGHT_DECAY,
    "warmup_ratio": WARMUP_RATIO,
    "seed": SEED,
    "hop_length": HOP_LENGTH,
    "sr": SR,
    "threshold": THRESHOLD,
}

In [19]:
with wandb.init(
    project="DL_audiotomidi",
    name="CNNTemporal_cqt",
    config=wandb_config
) as run:

    best_val_frame_f1 = train(
        model=model,
        train_dataloader=train_loader,
        val_dataloader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        device=DEVICE,
        n_epochs=NUM_EPOCHS,
        run=run,
        checkpoint_path=best_ckpt_path,
        threshold=THRESHOLD,
        hop_length=HOP_LENGTH,
        sr=SR,
        log_interval=50
    )

print("Best validation frame-F1:", best_val_frame_f1)


=== Epoch 1/100 ===
[Epoch 1/100] train_loss=0.5024, train_F1=0.2666 | val_loss=0.6207, val_F1=0.1917 | train_time=42.5s
*** New best frame-F1=0.1917, saved to E:\DL_audiotomidi\checkpoints\cnn_cqt_best.pt ***

=== Epoch 2/100 ===
[Epoch 2/100] train_loss=0.4594, train_F1=0.3228 | val_loss=0.5623, val_F1=0.1684 | train_time=40.3s

=== Epoch 3/100 ===
[Epoch 3/100] train_loss=0.4371, train_F1=0.3371 | val_loss=0.6219, val_F1=0.1566 | train_time=39.2s

=== Epoch 4/100 ===
[Epoch 4/100] train_loss=0.4241, train_F1=0.3516 | val_loss=0.7692, val_F1=0.1416 | train_time=39.7s

=== Epoch 5/100 ===
[Epoch 5/100] train_loss=0.4099, train_F1=0.3676 | val_loss=0.4253, val_F1=0.2610 | train_time=45.1s
*** New best frame-F1=0.2610, saved to E:\DL_audiotomidi\checkpoints\cnn_cqt_best.pt ***

=== Epoch 6/100 ===
[Epoch 6/100] train_loss=0.3999, train_F1=0.3970 | val_loss=0.3616, val_F1=0.3490 | train_time=43.2s
*** New best frame-F1=0.3490, saved to E:\DL_audiotomidi\checkpoints\cnn_cqt_best.pt ***



0,1
epoch,▁▁▁▁▂▂▂▂▃▃▃▃▃▃▃▄▄▄▄▄▄▅▅▅▅▅▆▆▆▆▆▆▇▇▇▇████
lr,▅█████▇▇▇▇▇▇▇▆▆▆▆▆▅▅▅▄▄▄▄▃▃▃▂▂▂▂▂▁▁▁▁▁▁▁
time/total_hours,▁
time/train_epoch_sec,▁▁▃▁▁▄█▇▁▃▁▂▄▇█▇▃▂▂▄▄▄▆▆▅▄▆▅▅▂█▄▆▄▆▅▅▆▃▅
train/frame_F1,▁▂▃▄▄▄▅▆▆▆▆▆▆▇▆▇▇▇▇▇▇▇▇▇▇▇██████████████
train/loss,█▆▆▅▅▄▄▃▃▃▃▂▃▂▂▂▂▂▂▂▂▂▂▂▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁
val/frame_F1,▁▁▃▂▃▃▄▃▆▆▆▄▆▇▇▇▇▇▆▇▇██▇████▇▇██████████
val/loss,▆█▄▃▄▃▂▂▂▂▂▂▁▂▁▁▂▁▁▃▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁

0,1
epoch,100.0
lr,0.0
time/total_hours,1.51773
time/train_epoch_sec,53.39701
train/frame_F1,0.71437
train/loss,0.17365
val/frame_F1,0.72313
val/loss,0.12469


Best validation frame-F1: 0.7331993100581946


In [14]:
ckpt = torch.load(best_ckpt_path, map_location=DEVICE)
model.load_state_dict(ckpt["model_state_dict"])
model.to(DEVICE)
model.eval()

CNNTemporal(
  (stem): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (cnn2d): Sequential(
    (0): PreActResBlock2D(
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (skip): Conv2d(32, 32, kernel_size=(1, 1), stride=(2, 1), bias=False)
    )
    (1): PreActResBlock2D(
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=Fals

In [15]:
from scripts.dataset_helpers import PianoRollDataset
from torch.utils.data import DataLoader

TEST_CHUNK_LEN = None # for CNN we don't need to chop the spectrogram into chunks, 
                      # but for RNN and CRNN we have to
TEST_BATCH_SIZE = 1

test_ds = PianoRollDataset(
    root_dir=str(PREPROCESSED_ROOT),
    split="test",
    spec_type=SPEC_TYPE,
    chunk_len=TEST_CHUNK_LEN,
    random_crop=False
)

test_loader = DataLoader(
    test_ds,
    batch_size=TEST_BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

In [19]:
from scripts.evaluate import compute_frame_micro_metrics

frame_metrics = compute_frame_micro_metrics(
    model,
    test_loader,
    DEVICE,
    threshold=0.6
)

print("=== Test frame-level metrics ===")
for k, v in frame_metrics.items():
    if k in ("tp", "fp", "fn"):
        continue
    print(f"{k}: {v * 100:.2f}%")

=== Test frame-level metrics ===
accuracy: 96.99%
precision: 68.80%
recall: 79.87%
frame_f1: 73.92%


In [18]:
from scripts.evaluate import compute_note_micro_metrics

note_metrics = compute_note_micro_metrics(
    model,
    test_loader,
    DEVICE,
    threshold=0.7,
    hop_length=HOP_LENGTH,
    sr=SR,
    onset_tolerance=0.05
)

print("=== Test note-level metrics ===")
for k, v in note_metrics.items():
    if k in ("tp", "fp", "fn"):
        continue
    print(f"{k}: {v * 100:.2f}%")

=== Test note-level metrics ===
accuracy: 36.44%
precision: 43.40%
recall: 69.46%
note_f1: 53.42%


In [None]:
from scripts.inference.py import get_piano_roll, predict, plot_comparison, measure_efficiency

# Test track with corresponding MIDI
AUDIO_PATH = "test_long.wav"
MIDI_PATH = "test_long.midi"




audio, _ = librosa.load(AUDIO_PATH, sr=SR, mono=True)

pred_probs, spec = predict(model, audio, DEVICE)

gt_roll = get_piano_roll(MIDI_PATH, SR, HOP_LENGTH)

plot_comparison(spec, pred_probs, gt_roll, SR, HOP_LENGTH)

In [None]:
stats = measure_efficiency(model, device='cpu', duration_sec=60)