In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DownSamplingBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=15):
        super().__init__()
        # Convolution avec stride=2 pour réduire la taille par 2 (équivalent décimage intelligent)
        self.conv = nn.Conv1d(
            in_channels, out_channels, 
            kernel_size=kernel_size, 
            stride=2, 
            padding=kernel_size//2
        )
        self.act = nn.LeakyReLU(0.1)
        self.bn = nn.BatchNorm1d(out_channels)

    def forward(self, x):
        return self.bn(self.act(self.conv(x)))

class UpSamplingBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=5):
        super().__init__()
        # Interpolation linéaire pour agrandir + Convolution
        self.conv = nn.Conv1d(
            in_channels, out_channels, 
            kernel_size=kernel_size, 
            stride=1, 
            padding=kernel_size//2
        )
        self.act = nn.LeakyReLU(0.1)
        self.bn = nn.BatchNorm1d(out_channels)

    def forward(self, x, skip_connection):
        # 1. Upsample (Interpolation linéaire)
        x = F.interpolate(x, scale_factor=2, mode='linear', align_corners=False)
        
        # 2. Gestion des petits décalages de taille (padding)
        if x.shape[-1] != skip_connection.shape[-1]:
            diff = skip_connection.shape[-1] - x.shape[-1]
            x = F.pad(x, (0, diff))
            
        # 3. Concaténation (Skip Connection)
        x = torch.cat([x, skip_connection], dim=1)
        
        # 4. Convolution finale
        return self.bn(self.act(self.conv(x)))

class WaveUNet(nn.Module):
    def __init__(self, num_levels=4, base_channels=24):
        super().__init__()
        
        self.levels = num_levels
        self.down_blocks = nn.ModuleList()
        self.up_blocks = nn.ModuleList()
        
        # --- Encoder (Descente) ---
        in_ch = 1 
        out_ch = base_channels
        self.skip_channels_history = []
        
        for _ in range(num_levels):
            self.down_blocks.append(DownSamplingBlock(in_ch, out_ch))
            self.skip_channels_history.append(in_ch)
            in_ch = out_ch
            out_ch *= 2 
            
        # --- Bottleneck ---
        self.bottleneck = nn.Conv1d(in_ch, out_ch, kernel_size=15, stride=1, padding=7)
        self.bottleneck_act = nn.LeakyReLU(0.1)
        
        # --- Decoder (Remontée) ---
        in_ch = out_ch 
        
        for i in range(num_levels):
            # On récupère la taille du skip
            skip_ch = self.skip_channels_history[-(i+1)]
            
            # --- CORRECTION ICI ---
            if i == num_levels - 1:
                # Si c'est le DERNIER bloc, on ne veut pas redescendre à 1 canal.
                # On veut sortir 'base_channels' (24) pour nourrir la final_conv.
                out_ch = base_channels
            else:
                # Sinon, on redescend normalement à la taille du skip
                out_ch = skip_ch
            
            self.up_blocks.append(UpSamplingBlock(in_ch + skip_ch, out_ch))
            in_ch = out_ch
            
        # --- Sortie ---
        # Attend base_channels (24) en entrée
        self.final_conv = nn.Conv1d(base_channels, 2, kernel_size=1, stride=1)
        
    def forward(self, x):
        skips = []
        
        # Encoder
        for block in self.down_blocks:
            skips.append(x)
            x = block(x)
            
        # Bottleneck
        x = self.bottleneck_act(self.bottleneck(x))
        
        # Decoder
        for i, block in enumerate(self.up_blocks):
            skip = skips[-(i+1)]
            x = block(x, skip)
            
        out = torch.tanh(self.final_conv(x))
        return out

In [2]:
import os
import glob
import random
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm

class WaveformDataset(Dataset):
    def __init__(self, base_dir, sample_rate=16000, segment_length=16384):
        self.base_dir = base_dir
        self.sr = sample_rate
        self.segment_length = segment_length
        # On récupère tous les dossiers 0001, 0002...
        self.folders = sorted(glob.glob(os.path.join(base_dir, "*")))
        
    def __len__(self):
        return len(self.folders)
    
    def __getitem__(self, idx):
        folder = self.folders[idx]
        
        # 1. Charger les fichiers
        # On cherche le fichier mix (peu importe le SNR)
        mix_path = glob.glob(os.path.join(folder, "mix_snr_*.wav"))[0]
        voice_path = os.path.join(folder, "voice.wav")
        noise_path = os.path.join(folder, "noise.wav")
        
        mix, _ = librosa.load(mix_path, sr=self.sr)
        voice, _ = librosa.load(voice_path, sr=self.sr)
        noise, _ = librosa.load(noise_path, sr=self.sr)
        
        # 2. Découpage aléatoire (Random Crop)
        # On prend un extrait de 'segment_length'
        if len(mix) > self.segment_length:
            start = random.randint(0, len(mix) - self.segment_length)
            end = start + self.segment_length
            mix = mix[start:end]
            voice = voice[start:end]
            noise = noise[start:end]
        else:
            # Si trop court, on pad avec des zéros
            pad_len = self.segment_length - len(mix)
            mix = np.pad(mix, (0, pad_len))
            voice = np.pad(voice, (0, pad_len))
            noise = np.pad(noise, (0, pad_len))
            
        # 3. Format PyTorch (Channel, Time) -> (1, T)
        mix = torch.FloatTensor(mix).unsqueeze(0)
        # Target: (2, T) -> Canal 0: Voix, Canal 1: Bruit
        targets = np.stack([voice, noise])
        targets = torch.FloatTensor(targets)
        
        return mix, targets

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def train_wave_unet(train_loader, val_loader, epochs=20):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Training on {device}")
    
    model = WaveUNet(num_levels=5, base_channels=24).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.L1Loss() # MAE est souvent meilleur que MSE pour l'audio brut
    
    train_losses = []
    val_losses = []
    
    for epoch in range(epochs):
        model.train()
        batch_loss = 0
        
        for mix, target in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            mix, target = mix.to(device), target.to(device)
            
            optimizer.zero_grad()
            output = model(mix) # Output shape: (Batch, 2, Time)
            
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            batch_loss += loss.item()
            
        train_losses.append(batch_loss / len(train_loader))
        
        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for mix, target in val_loader:
                mix, target = mix.to(device), target.to(device)
                output = model(mix)
                val_loss += criterion(output, target).item()
        
        val_losses.append(val_loss / len(val_loader))
        print(f"Loss Train: {train_losses[-1]:.4f} | Loss Val: {val_losses[-1]:.4f}")
        
    return model, train_losses, val_losses

# --- LANCEMENT ---



from torch.utils.data import random_split

# 1. Création du Dataset complet
full_train_dataset = WaveformDataset("train/", sample_rate=16000)

# 2. Calcul des tailles (ex: 80% train, 20% validation)
train_size = int(0.8 * len(full_train_dataset))
val_size = len(full_train_dataset) - train_size

# 3. Split aléatoire
train_subset, val_subset = random_split(full_train_dataset, [train_size, val_size])

# 4. DataLoaders séparés
dataloader_train = DataLoader(train_subset, batch_size=3, shuffle=True)
dataloader_val = DataLoader(val_subset, batch_size=3, shuffle=False)

# Maintenant vous pouvez lancer train_wave_unet(dataloader_train, dataloader_val, ...)

model, t_loss, v_loss = train_wave_unet(dataloader_train, dataloader_val, epochs=3)

ValueError: num_samples should be a positive integer value, but got num_samples=0

In [4]:
def calculate_si_sdr(reference, estimation):
    min_len = min(len(reference), len(estimation))
    reference = reference[:min_len]
    estimation = estimation[:min_len]
    dot_product = np.dot(reference, estimation)
    norm_ref = np.linalg.norm(reference)**2
    projection = (dot_product / (norm_ref + 1e-8)) * reference
    noise = estimation - projection
    si_sdr = 10 * np.log10(np.linalg.norm(projection)**2 / (np.linalg.norm(noise)**2 + 1e-8))
    return si_sdr

def evaluate_wave_unet(model, test_folder):
    model.eval()
    device = next(model.parameters()).device
    
    mix_path = glob.glob(os.path.join(test_folder, "mix_snr_*.wav"))[0]
    voice_path = os.path.join(test_folder, "voice.wav")
    
    # Chargement
    mix, sr = librosa.load(mix_path, sr=16000)
    voice_true, _ = librosa.load(voice_path, sr=16000)
    
    # Préparation
    input_tensor = torch.FloatTensor(mix).unsqueeze(0).unsqueeze(0).to(device) # (1, 1, T)
    
    # Inférence (On coupe le gradient pour sauver la RAM)
    with torch.no_grad():
        prediction = model(input_tensor)
        
    # Récupération (Canal 0 = Voix)
    voice_est = prediction[0, 0, :].cpu().numpy()
    
    # Calcul SI-SDR
    score = calculate_si_sdr(voice_true, voice_est)
    return score, voice_est, sr

# Exemple d'utilisation après entraînement
score, wav_out, sr = evaluate_wave_unet(model, "train/0001")
print(f"SI-SDR Wave-U-Net : {score:.2f} dB")

NameError: name 'model' is not defined

In [None]:
import numpy as np
import glob
import os
import torch
import librosa
from tqdm.auto import tqdm

def calculate_si_sdr(reference, estimation):
    """
    Calcule le SI-SDR (Scale-Invariant Signal-to-Distortion Ratio)
    """
    # Alignement des tailles (par sécurité)
    min_len = min(len(reference), len(estimation))
    reference = reference[:min_len]
    estimation = estimation[:min_len]
    
    # Éviter la division par zéro
    eps = 1e-8
    
    # Projection du signal estimé sur la référence
    dot_product = np.dot(reference, estimation)
    norm_ref = np.linalg.norm(reference)**2
    projection = (dot_product / (norm_ref + eps)) * reference
    
    # Le bruit est la partie orthogonale
    noise = estimation - projection
    
    # Calcul du ratio en dB
    numerator = np.linalg.norm(projection)**2
    denominator = np.linalg.norm(noise)**2
    si_sdr = 10 * np.log10(numerator / (denominator + eps))
    
    return si_sdr

def evaluate_on_test_set(model, test_root_dir="test/"):
    """
    Évalue le modèle sur TOUS les dossiers contenus dans test_root_dir.
    """
    model.eval()
    device = next(model.parameters()).device
    
    # Récupérer tous les sous-dossiers (0001, 0002, etc.) dans 'test/'
    test_folders = sorted(glob.glob(os.path.join(test_root_dir, "*")))
    
    if len(test_folders) == 0:
        print(f"Attention : Aucun dossier trouvé dans {test_root_dir}")
        return
        
    si_sdr_scores = []
    
    print(f"Début de l'évaluation sur {len(test_folders)} fichiers de test...")
    
    with torch.no_grad(): # Pas de gradient pour l'inférence (économie mémoire)
        for folder in tqdm(test_folders):
            # 1. Trouver les fichiers
            mix_files = glob.glob(os.path.join(folder, "mix_snr_*.wav"))
            if not mix_files:
                continue # On saute si dossier vide
                
            mix_path = mix_files[0]
            voice_path = os.path.join(folder, "voice.wav")
            
            # 2. Chargement Audio
            # On charge tout le fichier (sr=16000 pour Wave-U-Net)
            mix, sr = librosa.load(mix_path, sr=16000)
            voice_true, _ = librosa.load(voice_path, sr=16000)
            
            # 3. Préparation Tenseur
            # Shape: (Batch=1, Channel=1, Time=N)
            input_tensor = torch.FloatTensor(mix).unsqueeze(0).unsqueeze(0).to(device)
            
            # 4. Prédiction Wave-U-Net
            prediction = model(input_tensor)
            
            # 5. Récupération Voix (Canal 0)
            # On repasse sur CPU et en Numpy
            voice_est = prediction[0, 0, :].cpu().numpy()
            
            # 6. Score
            score = calculate_si_sdr(voice_true, voice_est)
            si_sdr_scores.append(score)

    # Statistiques Finales
    mean_score = np.mean(si_sdr_scores)
    median_score = np.median(si_sdr_scores)
    
    print("-" * 30)
    print(f"RÉSULTATS SUR LE JEU DE TEST ({len(si_sdr_scores)} fichiers)")
    print(f"SI-SDR Moyen   : {mean_score:.2f} dB")
    print(f"SI-SDR Médian  : {median_score:.2f} dB")
    print(f"SI-SDR Min     : {np.min(si_sdr_scores):.2f} dB")
    print(f"SI-SDR Max     : {np.max(si_sdr_scores):.2f} dB")
    print("-" * 30)
    
    return si_sdr_scores

# --- LANCEMENT DE L'ÉVALUATION ---
# Assurez-vous que votre modèle est entraîné avant de lancer ceci !
scores = evaluate_on_test_set(model, test_root_dir="test/")