# Progetto Machine Learning ID2

**Studente:** Adrian Patrizi

**Matricola:** 2094287

**Email:** patrizi.2094287@studenti.uniroma1.it

**Progetto scelto:** ID 2: Audio Restoration for Generative Models — Improving MusicGen Outputs


## Setup

In [None]:
import os
import glob
import random
import json
import warnings
import zipfile
import gdown
from datetime import datetime

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import librosa
import soundfile as sf
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from IPython.display import Audio, display

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

### Download audio


In [None]:
url = "https://drive.google.com/file/d/1TDkcfnqOSVib4i2RwdyB1bIJ18RetLhB/view?usp=sharing"
output = "/content/checkpoint.zip"

if not os.path.exists(output):
    gdown.download(id="1TDkcfnqOSVib4i2RwdyB1bIJ18RetLhB", output=output, quiet=False)

with zipfile.ZipFile(output, "r") as zip_ref:
    zip_ref.extractall()

print("Cartella checkpoint pronta")

In [None]:
# Scarica FMA Small - 8GB
!wget https://os.unil.cloud.switch.ch/fma/fma_small.zip
!unzip fma_small.zip

In questa cella vengono selezionati e suddivisi i file MP3 del dataset FMA Small in tre insiemi: train, validation e test.

Tramite `glob.glob('./fma_small/*/*.mp3')` estraiamo tutti i file `.mp3` nelle cartelle, poi con `random.shuffle(files)` viene mescolato l'elenco ed infine selezioniamo i primi 1000.

I file vengono poi suddivisi in:
   - 800 per il training,  
   - 100 per la validazione,  
   - 100 per il test.



In [None]:
# Prepara i file
random.seed(101)

files = glob.glob('./fma_small/*/*.mp3')
random.shuffle(files)
files = files[:1000]

train_files = files[:800]
val_files = files[800:900]
test_files = files[900:]
print(len(train_files), len(val_files), len(test_files))

## Dataset

###Classe AudioDataset

Questa classe definisce il dataset personalizzato per l’addestramento del modello di audio enhancement.  
Il suo obiettivo è simulare degradazioni realistiche, generando al volo (on-the-fly) coppie degraded - clean da cui la rete apprende a ricostruire il segnale originale.

**Funzionalità principali:**

1. **Parametri principali**
   - `audio_files`: lista dei percorsi dei file audio.  
   - `sample_rate`: frequenza di campionamento (32 kHz).  
   - `segment_length`: durata del segmento estratto in secondi.  
   - `degradation_types`: tipi di degradazioni applicate (quantizzazione, resample, low-pass, clipping).  
   - `identity_prob`: probabilità che un esempio resti non degradato (≈ 15%).
   - `seed`: opzionale, per riproducibilità.

2. **Metodo `degrade_audio()`**
   Applica degradazioni audio:
   - **Quantizzazione:** riduce la profondità in bit con dithering casuale.  
   - **Low-pass:** filtra le alte frequenze con una soglia casuale (3.5-8 kHz).  
   - **Resample:** durante il downsampling, le alte frequenze oltre metà del nuovo sample rate vengono eliminate completamente.  
   - **Clipping:** limita l’ampiezza per introdurre distorsione armonica.  
   In caso di errore nella compressione, aggiunge un leggero rumore gaussiano.

3. **Metodo `__getitem__()`**
   - Carica e segmenta il file audio in un frammento di durata fissa.  
   - Normalizza il segnale in ampiezza.  
   - Applica la degradazione (oppure mantiene l’identità con probabilità 0.15).  
   - Calcola i **mel-spettrogrammi** pulito e degradato (`n_mels=128`, `n_fft=1024`, `hop=256`).  
   - Converte in scala logaritmica (dB) e normalizza in **[0, 1]**, compatibile con l’output della rete.  
   - Restituisce un dizionario contenente:
     - `'degraded'`: spettrogramma degradato (input del modello)  
     - `'clean'`: spettrogramma pulito (target)  
     - `'degraded_audio'` e `'clean_audio'`: forme d’onda corrispondenti per eventuali analisi extra.

NOTA:
librosa.power_to_db() converte da potenza → decibel (logaritmo in base 10)

$$
\text{mel}_{\text{dB}} = 10 \cdot \log_{10}\left( \frac{\text{mel}}{\text{ref}} \right)
$$


`ref = np.max` imposta il valore massimo del mel-spettrogramma a 0 dB, cioè la banda più intensa ha valore 0 tutte le altre saranno valori negativi da -1 a -80 dB


In [None]:
class AudioDataset(Dataset):

    def __init__(self, audio_files, sample_rate=32000, segment_length=3.0,
                 degradation=['quantize', 'resample', 'reverb', 'clipping'],
                 identity_prob=0.15, seed=None):

        self.audio_files = audio_files
        self.sr = sample_rate
        self.segment_samples = int(segment_length * sample_rate)
        self.degradation = degradation
        self.identity_prob = identity_prob
        self.seed = seed

    def __len__(self):
        return len(self.audio_files)


    def degrade_audio(self, audio):

        x = audio.copy()

        # 1) Quantizzazione bit-depth + dithering
        if 'quantize' in self.degradation:
            bits = np.random.choice([8, 10, 12])
            q = 2 ** bits
            dither = np.random.uniform(-0.5/q, 0.5/q, size=x.shape)
            x = np.round(x * q) / q + dither
            x = np.clip(x, -1.0, 1.0)

        # 2) Downsampling (simula compressione e perdita di banda)
        if 'resample' in self.degradation:
            new_sr = np.random.choice([16000, 22050])
            y = librosa.resample(x, orig_sr=self.sr, target_sr=new_sr)
            x = librosa.resample(y, orig_sr=new_sr, target_sr=self.sr)

            # riallinea lunghezza
            if len(x) > len(audio):
                x = x[:len(audio)]
            elif len(x) < len(audio):
                x = np.pad(x, (0, len(audio) - len(x)))

        # 3) Riverbero artificiale (coda esponenziale)
        if 'reverb' in self.degradation:
            reverb_ms = np.random.uniform(50, 200)  # lunghezza in ms
            reverb_samples = int(self.sr * reverb_ms / 1000)
            ir = np.exp(-np.linspace(0, 5, reverb_samples))  # impulso decrescente
            x = np.convolve(x, ir, mode='full')[:len(x)]
            x = x / (np.max(np.abs(x)) + 1e-9)

        # 4) Clipping leggero
        if 'clipping' in self.degradation:
            clip_thr = float(np.random.uniform(0.8, 0.95))
            x = np.clip(x, -clip_thr, clip_thr)

        return x


    def __getitem__(self, idx):
        if self.seed is not None:
            np.random.seed(self.seed + idx)

        # Caricamento e segmentazione
        audio, sr = librosa.load(self.audio_files[idx], sr=self.sr, mono=True)
        if len(audio) > self.segment_samples:
            start = np.random.randint(0, len(audio) - self.segment_samples)
            audio = audio[start:start + self.segment_samples]
        else:
            audio = np.pad(audio, (0, self.segment_samples - len(audio)))

        # Normalizzazione
        max_val = np.max(np.abs(audio)) + 1e-8
        audio = audio / max_val

        # Identità stocastica: 15% dei campioni non degradati
        if np.random.rand() < self.identity_prob:
            degraded = audio.copy()
        else:
            degraded = self.degrade_audio(audio)

        # Mel-spectrogrammi
        clean_mel = librosa.feature.melspectrogram(
            y=audio, sr=self.sr, n_mels=128, n_fft=1024, hop_length=256
        )
        degraded_mel = librosa.feature.melspectrogram(
            y=degraded, sr=self.sr, n_mels=128, n_fft=1024, hop_length=256
        )

        # Conversione in dB (log10) + normalizzazione [0,1]
        clean_mel = librosa.power_to_db(clean_mel, ref=np.max, top_db=80.0)
        degraded_mel = librosa.power_to_db(degraded_mel, ref=np.max, top_db=80.0)
        clean_mel = (clean_mel + 80) / 80
        degraded_mel = (degraded_mel + 80) / 80

        return {
            'degraded': torch.FloatTensor(degraded_mel).unsqueeze(0),
            'clean': torch.FloatTensor(clean_mel).unsqueeze(0),
            'degraded_audio': torch.FloatTensor(degraded),
            'clean_audio': torch.FloatTensor(audio)
        }

### Istanziazione del dataset

In [None]:
train_dataset = AudioDataset(
    train_files,
    sample_rate=32000,
    segment_length=4.0
)


val_dataset = AudioDataset(
    val_files,
    sample_rate=32000,
    segment_length=4.0,
    seed = 99
)


test_dataset = AudioDataset(
    test_files,
    sample_rate=32000,
    segment_length=4.0,
    seed = 201
)

In [None]:
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")

### Visualizzazione esempio



Qui verifichiamo visivamente e acusticamente la correttezza del dataset generato.

Per farlo selezioniamo un indice (es. `idx = 0`) dal `train_dataset`. Si estrae un esempio contenente sia il segnale pulito che quello degradato.

   - `clean_audio` e `degraded_audio` sono le forme d’onda originali in formato NumPy.
   - `clean_mel` e `degraded_mel` sono i corrispondenti mel-spettrogrammi normalizzati (128 bande x T frame).

Come prima cosa viene riprodotto il segnale pulito e quello degradato direttamente in notebook per valutare a orecchio la qualità della degradazione.

Successivamete vengono mostrati i grafici dei mel-spettrogrammi in scala logaritmica:
     - **Clean Mel-Spectrogram:** rappresenta il riferimento ad alta qualità.
     - **Degraded Mel-Spectrogram:** mostra l’effetto delle degradazioni introdotte.


In [None]:
# Prendiamo un indice
idx = 44

# Estrai l'elemento dal dataset
sample = train_dataset[idx]

# --- Estrai dati ---
clean_audio = sample['clean_audio'].numpy()
degraded_audio = sample['degraded_audio'].numpy()

clean_mel = sample['clean'].squeeze().numpy()       # [128, T]
degraded_mel = sample['degraded'].squeeze().numpy() # [128, T]

print(f"Audio length (samples): {len(clean_audio)}")

# --- Audio ---
print("\n Clean audio:")
display(Audio(clean_audio, rate=32000))

print("Degraded audio:")
display(Audio(degraded_audio, rate=32000))

# --- Spettrogrammi ---
fig, axes = plt.subplots(2, 1, figsize=(12, 8))

img1 = axes[0].imshow(clean_mel, aspect='auto', origin='lower', cmap='magma')
axes[0].set_title('Clean Mel-Spectrogram')
axes[0].set_ylabel('Mel Frequency')
fig.colorbar(img1, ax=axes[0])

img2 = axes[1].imshow(degraded_mel, aspect='auto', origin='lower', cmap='magma')
axes[1].set_title('Degraded Mel-Spectrogram')
axes[1].set_ylabel('Mel Frequency')
axes[1].set_xlabel('Time Frames')
fig.colorbar(img2, ax=axes[1])

plt.tight_layout()
plt.show()


## DataLoader

Qui creiamo i DataLoader per gli insiemi di training, validazione e test, gestendo la lettura parallela dei campioni e la riproducibilità dei risultati.

La Funzione `worker_init_fn(worker_id)` serve a sincronizzare i semi casuali (seed) di ciascun worker del DataLoader, garantendo che operazioni casuali (come le degradazioni audio o l’estrazione dei segmenti) producano risultati riproducibili anche quando eseguite in più thread.


Vengono infine istanziati i DataLoader:
- `train_loader` che usa `shuffle=True` per mescolare i batch a ogni epoca.  
- `val_loader` e `test_loader` che mantengono `shuffle=False` per garantire coerenza nella valutazione.  
- `batch_size=4` imposta il numero di esempi per batch.  
- `num_workers=2` abilita il caricamento in parallelo per migliorare la velocità.  
- `pin_memory=True` velocizza il trasferimento dei batch alla GPU.

In [None]:
def worker_init_fn(worker_id):

    # Ottieni il seed globale di PyTorch per questo worker
    # % 2**32 lo converte in un intero compatibile con NumPy
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)


In [None]:
train_loader = DataLoader(
    train_dataset,
    batch_size=4,
    shuffle=True,
    num_workers=2,
    pin_memory=True,
    worker_init_fn=worker_init_fn
)


val_loader = DataLoader(
    val_dataset,
    batch_size=4,
    shuffle=False,
    num_workers=2,
    pin_memory=True,
    worker_init_fn=worker_init_fn
)


test_loader = DataLoader(
    test_dataset,
    batch_size=4,
    shuffle=False,
    num_workers=2,
    pin_memory=True,
    worker_init_fn=worker_init_fn
)

## Modello

Questa architettura combina reti convoluzionali (CNN) e ricorrenti (RNN) per migliorare la qualità degli spettrogrammi audio degradati.

L’obiettivo è prendere un Mel-spectrogramma degradato di forma `[B, 1, 128, T]`  e produrre un Mel-spectrogramma potenziato della stessa dimensione.

**Struttura**

- Encoder (3 blocchi CNN + ResidualBlock):

  estrae rappresentazioni locali di frequenza e tempo, riducendo gradualmente la risoluzione temporale (stride=2).

- RNN bottleneck (GRU bidirezionale):

  cattura dipendenze temporali globali, modellando la coerenza lungo l’asse del tempo. Opera su feature flattenate per ogni frame temporale.

- Decoder (3 blocchi ConvTranspose + skip connections):

  ricostruisce il Mel potenziato combinando le feature temporali e spaziali.  
  Gli skip connection (`torch.cat`) riutilizzano le informazioni dell’encoder (U-Net style).

- Residual Mode:
  se attivo (`residual_mode=True`), il modello predice un residuo da sommare all’input, invece di rigenerare il Mel completo.

**Output finale**
Il tensore in uscita è vincolato in `[0,1]` tramite Sigmoid, coerente con i Mel normalizzati.  Se `residual_mode=True`, il risultato è la somma tra input e residuo, con `clamp` per mantenere il range valido.


In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, channels, dropout=0.0):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(channels),
            nn.ReLU(inplace=True),
            nn.Dropout2d(dropout),
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(channels)
        )

    def forward(self, x):
        out = self.block(x)
        out = out + x
        return F.relu(out)


class AudioEnhancementCRNN(nn.Module):

    def __init__(self, n_mels=128, hidden_channels=[24, 48, 96],
                 rnn_hidden=128, rnn_layers=1, dropout=0.2,
                 residual_mode=True):
        super().__init__()
        self.n_mels = n_mels
        self.residual_mode = residual_mode

        # ----- Encoder -----
        # Input: [B, 1, 128, T]
        self.enc1 = nn.Sequential(
            nn.Conv2d(1, hidden_channels[0], 5, padding=2), # --> [B, 24, 128, T]
            nn.BatchNorm2d(hidden_channels[0]),
            nn.ReLU(),
            ResidualBlock(hidden_channels[0], dropout=dropout)
        )

        self.enc2 = nn.Sequential(
            nn.Conv2d(hidden_channels[0], hidden_channels[1], 4, stride=2, padding=1), # --> [B, 48, 64, T/2]
            nn.BatchNorm2d(hidden_channels[1]),
            nn.ReLU(),
            ResidualBlock(hidden_channels[1], dropout=dropout)
        )

        self.enc3 = nn.Sequential(
            nn.Conv2d(hidden_channels[1], hidden_channels[2], 4, stride=2, padding=1), # --> [B, 96, 32, T/4]
            nn.BatchNorm2d(hidden_channels[2]),
            nn.ReLU(),
            ResidualBlock(hidden_channels[2], dropout=dropout)
        )

        # ----- RNN bottleneck -----
        # Appiattisce le feature su (T/4) e modella la dinamica temporale globale
        self.rnn = nn.GRU(
            input_size=hidden_channels[2] * (n_mels // 4), #ogni frame e 96x32 features
            hidden_size=rnn_hidden,
            num_layers=rnn_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if rnn_layers > 1 else 0.0
        )
        self.rnn_projection = nn.Linear(rnn_hidden * 2, hidden_channels[2] * (n_mels // 4))
        # Output: stessa dimensione delle feature dell'encoder (per skip connections)

        # ----- Decoder -----
        # Decodifica simmetrica in stile U-Net con concatenazione skip connections
        self.dec3 = nn.Sequential(
            nn.ConvTranspose2d(hidden_channels[2] * 2, hidden_channels[1], 4, stride=2, padding=1), # --> [B, 48, 64, T/2]
            nn.BatchNorm2d(hidden_channels[1]),
            nn.ReLU(),
            ResidualBlock(hidden_channels[1], dropout=dropout)
        )

        self.dec2 = nn.Sequential(
            nn.ConvTranspose2d(hidden_channels[1] * 2, hidden_channels[0], 4, stride=2, padding=1), # --> [B, 24, 128, T]
            nn.BatchNorm2d(hidden_channels[0]),
            nn.ReLU(),
            ResidualBlock(hidden_channels[0], dropout=dropout)
        )

        self.dec1 = nn.Sequential(
            nn.Conv2d(hidden_channels[0] * 2, hidden_channels[0], 3, padding=1), # --> [B, 24, 128, T]
            nn.BatchNorm2d(hidden_channels[0]),
            nn.ReLU(),
            nn.Conv2d(hidden_channels[0], 1, 1), # --> [B, 1, 128, T]
            nn.Sigmoid()  # vincola output in [0,1]
        )

    def crop_match(self, a, b):
        min_t = min(a.size(-1), b.size(-1))
        return a[..., :min_t], b[..., :min_t]


    def forward(self, x):
        # ----- Encoder -----
        e1 = self.enc1(x)       # [B, 24, 128, T]
        e2 = self.enc2(e1)      # [B, 48, 64, T/2]
        e3 = self.enc3(e2)      # [B, 96, 32, T/4]

        # ----- RNN -----
        B, C, F, T = e3.shape
        rnn_in = e3.permute(0, 3, 1, 2).reshape(B, T, -1)     # [B, T/4, 96×32]
        rnn_out, _ = self.rnn(rnn_in)                         # [B, T/4, 2×rnn_hidden]
        rnn_out = self.rnn_projection(rnn_out)                # [B, T/4, 96×32]
        rnn_out = rnn_out.reshape(B, T, C, F).permute(0, 2, 3, 1)  # [B, 96, 32, T/4]

        # ----- Decoder -----
        d3 = self.dec3(torch.cat([e3, rnn_out], dim=1))       # concat skip: [B, 192, 32, T/4] --> [B, 48, 64, T/2]
        e2, d3 = self.crop_match(e2, d3)
        d2 = self.dec2(torch.cat([e2, d3], dim=1))            # [B, 96, 64, T/2] --> [B, 24, 128, T]
        e1, d2 = self.crop_match(e1, d2)
        d1 = self.dec1(torch.cat([e1, d2], dim=1))            # [B, 48, 128, T] --> [B, 1, 128, T]

        # ----- Residual connection -----
        if self.residual_mode:
            if d1.size(-1) != x.size(-1):
                min_t = min(d1.size(-1), x.size(-1))
                d1 = d1[..., :min_t]
                x = x[..., :min_t]
            d1 = x + d1                                        # output = input + residuo

        return torch.clamp(d1, 0.0, 1.0)                      # assicura output in [0,1]

## Loss

Questa classe combina più termini di loss per valutare quanto il Mel potenziato
si avvicina al target pulito, bilanciando fedeltà globale, precisione sulle alte frequenze.

**Componenti principali**
1. Charbonnier loss (`charb`)
   Variante della L1-loss che penalizza meno i grandi errori e stabilizza il training. Formula:  
   $$ L_{charb} = \sqrt{(x - y)^2 + \varepsilon} $$

2. High-Frequency weighted loss (`hf`)
   Pesa di più le bande Mel alte (quelle sopra ~6 kHz), per spingere il modello a ricostruire meglio brillantezza e dettagli armonici persi con la degradazione. Formula:
   $$
   \mathcal{L}_{\text{hf}} =
   \frac{1}{B C M T} \sum_{b,c,m,t}
   w_m \, \big| x_{b,c,m,t} - y_{b,c,m,t} \big|
   $$
   con pesi lineari definiti come:
   $$
   w_m = 0.5 + \frac{m}{M-1}
   $$
   in modo che $ w_m $ cresca da 0.5 (basse frequenze) a 1.5 (alte frequenze).

**Loss totale**
$$L_{tot} = w_{charb} \cdot L_{charb} + w_{hf} \cdot L_{hf}$$

I pesi di default (`w_charb=1.0, w_hf=0.3`) bilanciano i contributi.


In [None]:
class AudioRestorationLoss(nn.Module):

    def __init__(self, n_fft=1024, hop_length=256,
                 w_charb=1.0, w_hf=0.3, eps=1e-6):
        super().__init__()
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.w_charb = w_charb
        self.w_hf = w_hf
        self.eps = eps

    def charbonnier_loss(self, x, y):
        diff = x - y
        return torch.mean(torch.sqrt(diff * diff + self.eps))

    def hf_weighted_loss(self, x, y):

        # Estrazione delle dimensioni
        B, C, M, T = x.shape  # [B,1,n_mels,time]

        # Creazione del vettore dei pesi w_m
        # linspace genera vettore 1D di m pesi tra 0.5 e 1.5
        # con view lo ridimensionamento per broadcasting sul batch [B, 1, M, T]
        w = torch.linspace(0.5, 1.5, M, device=x.device).view(1, 1, M, 1)
        return torch.mean(w * torch.abs(x - y))

    def forward(self, pred_mel, target_mel, residual_mode=False):
        # === Mel losses ===
        charb = self.charbonnier_loss(pred_mel, target_mel)
        hf = self.hf_weighted_loss(pred_mel, target_mel)

        total_loss = (self.w_charb * charb + self.w_hf * hf)

        metrics = {
            'charb': charb.item(),
            'hf': hf.item(),
            'mode': 'residual' if residual_mode else 'direct'
        }

        return total_loss, metrics

## Training

### Training Loop

Questa funzione gestisce l’intero processo di addestramento  del modello CRNN,  
includendo resume da checkpoint, validazione, salvataggio automatico e early stopping.

**Flusso principale**
1. Preparazione
   - Attiva ottimizzazioni CUDA (`cudnn.benchmark`) per massimizzare le prestazioni.  
   - Crea una directory di salvataggio separata per la modalità corrente (`residual` o `direct`).

2. Inizializzazione
   - Ottimizzatore: `AdamW`, con `weight_decay` per regolarizzazione.  
   - Scheduler: `ReduceLROnPlateau`, dimezza il learning rate se la validazione non migliora per 5 epoche.  
   - Loss: `AudioRestorationLoss()` (combinazione Charbonnier + HF).

3. Resume
   - Se `resume=True`, ricarica modello, ottimizzatore, scheduler e cronologia dal checkpoint precedente.

4. Training Loop
   - Per ogni epoca:
     - imposta il modello in modalità `train()`, azzera i gradienti,  
       calcola la loss batch per batch e aggiorna i pesi.
     - Clip dei gradienti (max_norm=1.0) per evitare esplosioni numeriche.
     - Calcola media delle loss di training.

5. Validazione
   - Imposta `model.eval()` e disattiva il gradiente (`torch.no_grad()`).
   - Calcola la loss media sui dati di validazione per monitorare il progresso.

6. Gestione checkpoint
   - Salva automaticamente il miglior modello quando la `val_loss` migliora.
   - Se la validazione non migliora per 5 epoche consecutive, attiva early stopping.
   - Ogni 10 epoche salva comunque un checkpoint intermedio (`checkpoint_epoch_X.pt`).

7. Output
   - Restituisce la cronologia (`history`) con le curve `train_loss` e `val_loss`.



NOTA:
Durante il training la loss lavora solo nel dominio Mel (`charb + hf`) poiché è molto più veloce e stabile, mentre durante la validazione viene inclusa anche la `STFT loss` che misura la somiglianza spettrale tra audio ricostruito e target, questo perché vogliamo verificare anche quanto bene il suono ricostruito
è simile all’originale in termini di spettro.

In [None]:
def train_model(
    model,
    train_loader,
    val_loader,
    epochs=20,
    lr=1e-3,
    save_dir='checkpoints',
    resume=False,
    resume_path=None
):

    # ---------- Ottimizzazioni GPU ----------
    # benchmark=True --> PyTorch sceglie automaticamente la conv più veloce per la GPU
    # deterministic=False --> leggermente più veloce ma non completamente riproducibile
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = False

    # ---------- Preparazione cartelle ----------
    mode_name = "residual" if model.residual_mode else "direct"
    save_dir = os.path.join(save_dir, mode_name)
    os.makedirs(save_dir, exist_ok=True)

    # ---------- Ottimizzatore, scheduler e loss ----------
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-3)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-6
    )
    criterion = AudioRestorationLoss()

    # ---------- Resume logic ----------
    start_epoch = 0
    best_val_loss = float('inf')
    history = {'train_loss': [], 'val_loss': []}

    if resume:
      ckpt_path = resume_path or os.path.join(save_dir, 'best_model.pt')
      if os.path.exists(ckpt_path):
          print(f"Resuming training from checkpoint: {ckpt_path}")
          checkpoint = torch.load(ckpt_path, map_location=device, weights_only=False)

          # Rimuovi eventuale prefisso "_orig_mod."
          state_dict = checkpoint['model_state_dict']
          new_state_dict = {k.replace("_orig_mod.", ""): v for k, v in state_dict.items()}
          model.load_state_dict(new_state_dict, strict=False)

          optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
          if 'scheduler_state_dict' in checkpoint:
              scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

          start_epoch = checkpoint.get('epoch', 0) + 1
          best_val_loss = checkpoint.get('best_val_loss', float('inf'))
          history = checkpoint.get('history', history)
          print(f" Resumed from epoch {start_epoch}, best val loss {best_val_loss:.6f}")


    # ---------- TRAINING LOOP ----------
    patience_counter = 0
    patience_limit = 5

    for epoch in range(start_epoch, epochs):

        model.train()
        train_losses = []
        train_components = {'charb': [], 'hf': []}

        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
        for batch in pbar:
            degraded = batch['degraded'].to(device)
            clean = batch['clean'].to(device)

            optimizer.zero_grad()
            enhanced = model(degraded) #forward pass

            # Allinea lunghezza temporale (dovuta agli stride del modello)
            if enhanced.size(-1) != clean.size(-1):
                min_t = min(enhanced.size(-1), clean.size(-1))
                enhanced = enhanced[..., :min_t]
                clean = clean[..., :min_t]

            # Calcolo della loss (solo Mel domain durante il training)
            loss, metrics = criterion(
                enhanced, clean, residual_mode=model.residual_mode
            )
            loss.backward()

            # Clipping dei gradienti per stabilità
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            train_losses.append(loss.item())
            for k in train_components.keys():
                train_components[k].append(metrics[k])
            pbar.set_postfix({'loss': f"{loss.item():.4f}"})

        avg_train_loss = np.mean(train_losses)

        # ---------- VALIDATION ----------
        model.eval()
        val_losses = []
        with torch.no_grad():

            pbar_val = tqdm(val_loader, desc=f"Validation {epoch+1}/{epochs}")
            for batch in pbar_val:
                degraded = batch['degraded'].to(device)
                clean = batch['clean'].to(device)
                enhanced = model(degraded)

                if enhanced.size(-1) != clean.size(-1):
                    min_t = min(enhanced.size(-1), clean.size(-1))
                    enhanced = enhanced[..., :min_t]
                    clean = clean[..., :min_t]

                #Sistemare uso stft
                loss, _ = criterion(
                    enhanced, clean, residual_mode=model.residual_mode
                )

                val_losses.append(loss.item())

        avg_val_loss = np.mean(val_losses)

        # ---------- Log e scheduler ----------
        history['train_loss'].append(avg_train_loss)
        history['val_loss'].append(avg_val_loss)

        print(f"\nEpoch {epoch+1}: Train={avg_train_loss:.4f} | Val={avg_val_loss:.4f}")
        scheduler.step(avg_val_loss)

        # ---------- SALVATAGGIO CHECKPOINTS ----------
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'best_val_loss': best_val_loss,
                'history': history
            }, os.path.join(save_dir, 'best_model.pt'))
            print("Saved new best model")
        else:
            patience_counter += 1
            print(f"Patience counter: {patience_counter}")

        if patience_counter >= patience_limit:
            print(f"\n Early stopping at epoch {epoch+1}")
            break

        # Salvataggio checkpoint ogni 10 epoche
        if (epoch + 1) % 10 == 0:
            ckpt_name = os.path.join(save_dir, f'checkpoint_epoch_{epoch+1}.pt')
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'best_val_loss': best_val_loss,
                'history': history
            }, ckpt_name)
            print(f" Saved checkpoint: {ckpt_name}")

    return history

### Plot

Plot delle curve di training e validazione

In [None]:
def plot_training_history(history, mode='direct'):

    fig, ax = plt.subplots(figsize=(8, 5))

    ax.plot(history['train_loss'], label='Train', color='tab:blue')
    ax.plot(history['val_loss'], label='Validation', color='tab:orange')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.set_title(f'Training & Validation Loss ({mode} mode)')
    ax.legend()
    ax.grid(True, linestyle='--', alpha=0.6)

    plt.tight_layout()
    plt.show()

## Test

### Metriche

La Log-Spectral Distance misura quanto differiscono due spettri, in questo caso i Mel in dB. Più è piccolo il valore, più lo spettro stimato assomiglia a quello reale. Formula:

$$
\text{LSD} =
\frac{1}{T} \sum_{t=1}^{T}
\sqrt{
\frac{1}{M} \sum_{m=1}^{M}
\left(
S_{\text{clean}}(m, t) - S_{\text{test}}(m, t)
\right)^2
}
$$

In [None]:
def compute_lsd_db(clean_mel_db, test_mel_db):
    eps = 1e-8
    diff = clean_mel_db - test_mel_db
    return float(np.mean(np.sqrt(np.mean(diff**2, axis=0) + eps)))

Misura la somiglianza direzionale tra due vettori in questo caso gli spettri appiattiti. I valori sono tra -1 e 1 dove:
* 1 --> perfetta somiglianza;
* 0 --> ortogonali;
* -1 --> opposti.

È utile per capire se il modello conserva la “forma” dello spettro anche quando l’ampiezza differisce. Formula:
$$
\text{Cosine}(p, t) =
\frac{
\langle p, t \rangle
}{
\|p\|_2 \, \|t\|_2
}
$$

In [None]:
def compute_cosine_similarity_mel(pred_mel, target_mel):
    """Cosine similarity tra due mel-spettrogram flatten."""
    # pred_mel, target_mel: numpy arrays shape (n_mels, T)
    # appiattisci in vettori
    p = pred_mel.flatten()
    t = target_mel.flatten()
    # dot / (||p|| * ||t||)
    num = np.dot(p, t)
    den = (np.linalg.norm(p) * np.linalg.norm(t) + 1e-12)
    return num / den






### Test Loop

Questa funzione valuta le prestazioni del modello confrontando **quanto il Mel potenziato (enhanced)**
si avvicina al Mel pulito (clean), rispetto al Mel degradato (degraded).

L’obiettivo è misurare se e quanto il modello riduce la distanza tra il segnale degradato e quello pulito.

1. Per ogni clip nel `test_loader`:
   - calcola lo spettrogramma Mel potenziato (`enhanced`) tramite il modello;
   - confronta i Mel-spettri degraded --> clean e enhanced --> clean;
   - valuta le differenze usando tre metriche:
     - L1 --> errore assoluto medio (più piccolo è meglio);
     - LSD (Log-Spectral Distance) --> distanza percettiva in dB (più piccolo è meglio);
     - Cosine Similarity --> correlazione spettrale (più grande è meglio).

2. Calcola i guadagni (gain) come differenza tra la metrica del segnale degradato e quella dell’enhanced:
   - `gain_l1`, `gain_lsd` > 0 ⇒ l’enhanced è più vicino al clean (miglioramento);
   - `gain_cos` > 0 ⇒ la similarità spettrale è aumentata.

3. Salva tutti i risultati in un file `.csv` (`metrics_mel_improvement.csv`) e stampa le medie aggregate.

**Come usare la funzione di valutazione**

Se stai usando il **modello diretto** (output = Mel potenziato):
```python
evaluate_model(model, test_loader, device, residual_mode=False)
```

Se stai usando la **modalità residua** (output = Mel residuo sommato al degradato):

```python
evaluate_model(model, test_loader, device, residual_mode=True)
```

In [None]:
def evaluate_model(model, test_loader, device,
                                   save_dir='results_mel_improvement',
                                   residual_mode=False):

    os.makedirs(save_dir, exist_ok=True)
    model.eval()

    rows = []

    with torch.no_grad():
        for batch in tqdm(test_loader, desc="Evaluating (Mel improvement)"):
            # Estrarre Mel degradato e pulito [B, 1, M, T] normalizzati in [0,1]
            degraded = batch['degraded'].to(device)
            clean    = batch['clean'].to(device)

            # Forward pass del modello
            enh_part = torch.clamp(model(degraded), 0.0, 1.0)

            # Gestione delle due modalità operative
            if residual_mode:
                # Modalità residuo --> somma output + input degradato
                T = min(enh_part.size(-1), degraded.size(-1), clean.size(-1))
                degraded = degraded[..., :T]
                clean    = clean[..., :T]
                enh_part = enh_part[..., :T]
                enhanced = torch.clamp(degraded + enh_part, 0.0, 1.0)
            else:
                # Modalità diretta --> l'output è già il Mel potenziato
                T = min(enh_part.size(-1), clean.size(-1))
                enhanced = enh_part[..., :T]
                clean    = clean[..., :T]
                degraded = degraded[..., :T]

            # Conversione a NumPy per le metriche
            enh_np = enhanced.cpu().squeeze(1).numpy()
            cln_np = clean.cpu().squeeze(1).numpy()
            deg_np = degraded.cpu().squeeze(1).numpy()

            # Ciclo sui singoli esempi del batch
            for i in range(len(cln_np)):
                # Converti da scala [0,1] → [-80,0] dB per il calcolo della LSD
                mel_clean_db = cln_np[i] * 80.0 - 80.0
                mel_enh_db   =  enh_np[i] * 80.0 - 80.0
                mel_deg_db   =  deg_np[i] * 80.0 - 80.0

                # ---------- METRICHE ----------
                # L1: errore medio assoluto in scala lineare [0,1]
                l1_deg = np.mean(np.abs(deg_np[i] - cln_np[i]))
                l1_enh = np.mean(np.abs(enh_np[i] - cln_np[i]))
                gain_l1 = l1_deg - l1_enh  # >0 --> miglioramento

                # LSD: distanza logaritmica percettiva in dB
                lsd_deg = compute_lsd_db(mel_clean_db, mel_deg_db)
                lsd_enh = compute_lsd_db(mel_clean_db, mel_enh_db)
                gain_lsd = lsd_deg - lsd_enh

                 # Cosine Similarity: coerenza nella forma spettrale
                cos_deg = compute_cosine_similarity_mel(deg_np[i], cln_np[i])
                cos_enh = compute_cosine_similarity_mel(enh_np[i], cln_np[i])
                gain_cos = cos_enh - cos_deg # >0 --> miglioramento

                # Salva tutte le metriche per la clip corrente
                rows.append({
                    # valori assoluti
                    'l1_deg': l1_deg, 'l1_enh': l1_enh,
                    'lsd_deg': lsd_deg, 'lsd_enh': lsd_enh,
                    'cos_deg': cos_deg, 'cos_enh': cos_enh,
                    # guadagni (positivi = miglioramento)
                    'gain_l1': gain_l1,
                    'gain_lsd': gain_lsd,
                    'gain_cos': gain_cos
                })

    # DataFrame con tutte le metriche raccolte
    df = pd.DataFrame(rows)
    means = df.mean(numeric_only=True).to_dict()

     # ---------- RISULTATI ----------
    print("\n=== Evaluation Results (Mel Improvement) ===")
    print(f"l1_deg:  {means['l1_deg']:.4f} | l1_enh:  {means['l1_enh']:.4f} | gain_l1:  {means['gain_l1']:.4f}")
    print(f"lsd_deg: {means['lsd_deg']:.4f} | lsd_enh: {means['lsd_enh']:.4f} | gain_lsd: {means['gain_lsd']:.4f}")
    print(f"cos_deg: {means['cos_deg']:.4f} | cos_enh: {means['cos_enh']:.4f} | gain_cos: {means['gain_cos']:.4f}")
    print("Nota: gain_l1, gain_lsd, gain_cos > 0 indicano miglioramento")

    # Salva i risultati dettagliati in CSV
    out_csv = os.path.join(save_dir, 'metrics_mel_improvement.csv')
    df.to_csv(out_csv, index=False)
    print(f"\nSaved detailed metrics to {out_csv}")

    return df, means


Visualizza e confronta i Mel-spettri Clean, Degraded e Enhanced per un singolo esempio permettendo una valutazione qualitativa del comportamento del modello.
Parametri:
  * model: rete neurale addestrata per il miglioramento audio
  * sample: dizionario {'degraded', 'clean'} restituito dal Dataset
  * device: 'cpu' o 'cuda'
  * save_path: percorso dove salvare l'immagine risultante

Output: immagine con i tre spettrogrammi a confronto.

In [None]:
def visualize_enhancement(model, sample, device, save_path='plots/enhancement.png'):

    # Assicura che la directory di salvataggio esista
    os.makedirs(os.path.dirname(save_path), exist_ok=True)

    model.eval()

    # Prepara i dati per l'inferenza, aggiunge dim batch
    degraded = sample['degraded'].unsqueeze(0).to(device)

    # Estrae il Mel pulito per confronto e lo converte in numpy per il plotting
    clean = sample['clean'].squeeze().cpu().numpy()

    with torch.no_grad():
        # Ottiene il Mel potenziato dal modello e converte in numpy
        enhanced = model(degraded).cpu().squeeze().numpy()

    # Estrae anche il Mel degradato originale come numpy
    degraded = sample['degraded'].squeeze().numpy()

    # ---------- VISUALIZZAZIONE ----------
    fig, axes = plt.subplots(3, 1, figsize=(10, 8))
    for ax, data, title in zip(axes, [clean, degraded, enhanced],
                               ['Clean', 'Degraded', 'Enhanced']):
        img = ax.imshow(data, aspect='auto', origin='lower', cmap='magma')
        ax.set_title(title)
        fig.colorbar(img, ax=ax)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.show()


## Esperimenti

Con la variabile `no_training` si gestisce lo skip della fase di training per un'esecuzione più rapida. Di base è settata a `True`, se si vuole eseguire il training cambiare a `False`

In [None]:
no_training = True

### Modello MEL diretta

In [None]:
save_dir_direct = 'checkpoints'
os.makedirs(save_dir_direct, exist_ok=True)

# Inizializza il modello
model_direct = AudioEnhancementCRNN(residual_mode=False).to(device)
print(f"Model ready on {device}, trainable parameters: {sum(p.numel() for p in model_direct.parameters() if p.requires_grad):,}")

# Esegui training
ckpt_direct = '/content/checkpoints/direct/best_model.pt'

if no_training and os.path.exists(ckpt_direct):
    print(f" Checkpoint già esistente: {ckpt_direct}. Skip training")

elif no_training:
    print("Nessun checkpoint trovato. Avvio del training")
    history_direct = train_model(
        model_direct,
        train_loader,
        val_loader,
        epochs=20,
        lr=1e-4,
        save_dir=save_dir_direct,
        resume=False
    )

    # Plot loss
    plot_training_history(history_direct, mode='direct')

else:
    print("Avvio del training")
    history_direct = train_model(
        model_direct,
        train_loader,
        val_loader,
        epochs=20,
        lr=1e-4,
        save_dir=save_dir_direct,
        resume=False
    )

    # Plot loss
    plot_training_history(history_direct, mode='direct')

In [None]:
# Valutazione sul test set
if os.path.exists(ckpt_direct):
    # Load the checkpoint with weights_only=False
    checkpoint = torch.load(ckpt_direct, map_location=device, weights_only=False)
    model_direct.load_state_dict(checkpoint['model_state_dict'], strict=False)
else:
    print(" Nessun checkpoint trovato")

df_direct, metrics_direct = evaluate_model(
    model_direct,
    test_loader,
    device,
    save_dir='results_direct',
    residual_mode=False
)

In [None]:
# Visualizzazione qualitativa
sample_idx = np.random.randint(0, len(test_dataset))
sample = test_dataset[sample_idx]
visualize_enhancement(model_direct, sample, device, save_path='plots/enhancement_direct.png')

### Modello su residui

In [None]:
save_dir_residual = 'checkpoints'
os.makedirs(save_dir_residual, exist_ok=True)

# Inizializza il modello
model_residual = AudioEnhancementCRNN(residual_mode=True).to(device)
print(f"Model ready on {device}, trainable parameters: {sum(p.numel() for p in model_residual.parameters() if p.requires_grad):,}")

# Esegui training
ckpt_residual = '/content/checkpoints/residual/best_model.pt'
if no_training and os.path.exists(ckpt_residual):
    print(f" Checkpoint già esistente: {ckpt_residual}. Skip training")

elif no_training:
    print("Nessun checkpoint trovato. Avvio del training")
    history_residual = train_model(
        model_residual,
        train_loader,
        val_loader,
        epochs=20,
        lr=1e-4,
        save_dir=save_dir_residual,
        resume=False
    )

    # Plot loss
    plot_training_history(history_residual, mode='residual')

else:
    history_residual = train_model(
        model_residual,
        train_loader,
        val_loader,
        epochs=20,
        lr=1e-4,
        save_dir=save_dir_residual,
        resume=False
    )

    # Plot loss
    plot_training_history(history_residual, mode='residual')

In [None]:
# Valutazione sul test set
if os.path.exists(ckpt_residual):
    checkpoint = torch.load(ckpt_residual, map_location=device, weights_only=False)
    model_residual.load_state_dict(checkpoint['model_state_dict'], strict=False)
else:
    print(" Nessun checkpoint trovato")

df_residual, metrics_residual = evaluate_model(
    model_residual,
    test_loader,
    device,
    save_dir='results_residual',
    residual_mode=False
)

In [None]:
# Visualizzazione qualitativa
sample_idx = np.random.randint(0, len(test_dataset))
sample = test_dataset[sample_idx]
visualize_enhancement(model_residual, sample, device, save_path='plots/enhancement_residual.png')

### Confronto

In [None]:
# Tabella comparativa sui guadagni medi
comparison = pd.DataFrame([
    {
        'gain_l1': metrics_direct['gain_l1'],
        'gain_lsd': metrics_direct['gain_lsd'],
        'gain_cos': metrics_direct['gain_cos']
    },
    {
        'gain_l1': metrics_residual['gain_l1'],
        'gain_lsd': metrics_residual['gain_lsd'],
        'gain_cos': metrics_residual['gain_cos']
    }
], index=['Direct', 'Residual']).T

print("\n=== Confronto finale dei guadagni (Mel-domain) ===")
display(comparison.round(4))

# ---- Plot dei guadagni medi ----
ax = comparison.plot.bar(
    figsize=(10,5),
    colormap='viridis',
    edgecolor='black'
)
plt.title('Confronto tra modello diretto e residuo (guadagno medio)')
plt.ylabel('Δ rispetto al degraded (positivo = miglioramento)')
plt.grid(True, linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()
