Donn√©es Multimedia - Projet non-alternant - Module AUDIO
==============
---

# 0.a Imports et connection google drive

In [None]:
import os
import json
import torch
import librosa

import numpy as np
import pandas as pd
import torch.nn as nn
import matplotlib.pyplot as plt

from google.colab import drive
from torch.utils.data import Dataset, DataLoader

In [None]:
drive.mount("/content/drive", force_remount=True)
chemin_jsons : str = "/content/drive/MyDrive/Projet non-alternant/Corpus/json/"
chemin_audios : str = "/content/drive/MyDrive/Projet non-alternant/Corpus/train_val_videos/TrainValAudio/"

Mounted at /content/drive


# 0.b Fonctions de reproductibilit√©

In [None]:
def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'

device = get_device()
device

'cuda'

In [None]:
def same_seeds(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

same_seeds(42)

# Model

In [None]:
def wav_to_logmel(wav_path, sr=22050, n_mels=128, n_fft=2048, hop_length=512, duration=10):
    y, _ = librosa.load(wav_path, sr=sr, mono=True, duration=duration)

    target_len = sr * duration
    if len(y) < target_len:
        y = np.pad(y, (0, target_len - len(y)), mode='constant')
    else:
        y = y[:target_len]

    mel_spec = librosa.feature.melspectrogram(
        y=y, sr=sr, n_mels=n_mels, n_fft=n_fft, hop_length=hop_length)

    log_mel = librosa.power_to_db(mel_spec, ref=np.max)

    log_mel = (log_mel - log_mel.mean()) / (log_mel.std() + 1e-6)

    return log_mel  # shape: (128, ~431) pour 10s @ 22050 Hz

In [None]:
class AudioCNN(nn.Module):
    def __init__(self, num_classes=20):
        super(AudioCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((4, 4))
            )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 4 * 4, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
class AudioDataset(Dataset):
    def __init__(self, data, labels, normalize=True, mean=None, std=None):
        self.data = data
        self.labels = labels
        self.normalize = normalize

        # Calculate mean and std if not provided
        if normalize and (mean is None or std is None):
            all_data = torch.stack([d for d in data])
            self.mean = all_data.mean()
            self.std = all_data.std()
        else:
            self.mean = mean
            self.std = std

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]

        # Normalize
        if self.normalize:
            sample = (sample - self.mean) / (self.std + 1e-8)


In [None]:
class AudioDataset(Dataset):
    def __init__(self, csv_path, audio_dir):
        self.df = pd.read_csv(csv_path)
        self.audio_dir = audio_dir

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        video_id = row["video_id"]
        label = int(row["label"])

        audio_path = os.path.join(self.audio_dir, f"{video_id}.wav")
        logmel = wav_to_logmel(audio_path)
        logmel = torch.tensor(logmel).unsqueeze(0).float()
        label = torch.tensor(label, dtype=torch.long)

        return logmel, label

# ----------------------------
# Cr√©ation des DataLoaders
# ----------------------------
def get_dataloaders(
    train_csv,
    val_csv,
    audio_dir,
    batch_size=16,
    num_workers=2
):
    train_dataset = AudioDatasetFromCSV(train_csv, audio_dir)
    val_dataset = AudioDatasetFromCSV(val_csv, audio_dir)

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )

    print(f"‚úÖ Train set: {len(train_dataset)} √©chantillons")
    print(f"‚úÖ Val set: {len(val_dataset)} √©chantillons")
    return train_loader, val_loader

In [None]:
class MSRVTAudioDataset(Dataset):
    def __init__(self, json_path, audio_dir, transform=None):
        with open(json_path, 'r') as f:
            data = json.load(f)
        self.videos = data['videos']
        self.audio_dir = audio_dir
        self.transform = transform

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        vid = self.videos[idx]
        video_id = vid['video_id']
        label = vid['category']
        audio_path = os.path.join(self.audio_dir, f"{video_id}.wav")

        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"Audio manquant : {audio_path}")

        logmel = wav_to_logmel(audio_path)  # (128, T)
        logmel = torch.tensor(logmel).unsqueeze(0)  # (1, 128, T)

        return logmel.float(), label

In [None]:
def train_model(model, train_loader, val_loader, learning_rate,
                num_epochs=20, patience=5, save_path="best_model.pth"):

    device = get_device()
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_epoch = 0

    for epoch in range(num_epochs):
        # ---------- Entra√Ænement ----------
        model.train()
        train_loss = 0.0
        correct_train = 0
        total_train = 0

        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(out.data, 1)
            total_train += y.size(0)
            correct_train += (predicted == y).sum().item()

        train_acc = 100 * correct_train / total_train
        avg_train_loss = train_loss / len(train_loader)

        # ---------- Validation ----------
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for x, y in val_loader:
                x, y = x.to(device), y.to(device)
                out = model(x)
                loss = criterion(out, y)
                val_loss += loss.item()
                _, predicted = torch.max(out.data, 1)
                total_val += y.size(0)
                correct_val += (predicted == y).sum().item()

        val_acc = 100 * correct_val / total_val
        avg_val_loss = val_loss / len(val_loader)

        # ---------- Affichage ----------
        print(f"Epoch [{epoch+1}/{num_epochs}] "
              f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}% | "
              f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%")

        # ---------- Early Stopping ----------
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            epochs_no_improve = 0
            best_epoch = epoch
            torch.save(model.state_dict(), save_path)
            print(f"  ‚Üí Nouveau meilleur mod√®le sauvegard√© √† l'√©poque {epoch+1} !")
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"\nüõë ¬°Early stopping d√©clench√© apr√®s {epoch+1} √©poques.")
                print(f"Meilleur mod√®le √† l'√©poque {best_epoch+1} (val_loss = {best_val_loss:.4f})")
                break

    # Recharger le meilleur mod√®le
    model.load_state_dict(torch.load(save_path, map_location=device))
    print(f"\n‚úÖ Entra√Ænement termin√©. Meilleur mod√®le charg√© depuis {save_path}.")
    return model

In [None]:
train_json = "/content/drive/MyDrive/Projet non-alternant/Corpus/json/train_videodatainfo_audio.json"
val_json = "/content/drive/MyDrive/Projet non-alternant/Corpus/json/val_videodatainfo_audio.json"
audio_dir = "/content/drive/MyDrive/Projet non-alternant/Corpus/train_val_videos/TrainValAudio"

In [None]:
full_train_dataset = MSRVTAudioDataset(train_json, audio_dir)
full_val_dataset = MSRVTAudioDataset(val_json, audio_dir)

In [None]:
N = 1000
n = 200
train_dataset = torch.utils.data.Subset(full_train_dataset, indices=list(range(min(N, len(full_train_dataset)))))
val_dataset = torch.utils.data.Subset(full_val_dataset, indices=list(range(min(n, len(full_val_dataset)))))

print(f"Train subset size: {len(train_dataset)}")
print(f"Val subset size: {len(val_dataset)}")

Train subset size: 1000
Val subset size: 200


In [None]:
# Datasets
train_dataset = MSRVTAudioDataset(train_json, audio_dir)
val_dataset = MSRVTAudioDataset(val_json, audio_dir)

In [None]:
# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=0)

In [None]:
# Mod√®le
model = AudioCNN(num_classes=20)
model.to(get_device())

AudioCNN(
  (features): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): AdaptiveAvgPool2d(output_size=(4, 4))
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=2048, out_features=512, bias=True)
    (2): ReLU()
    (3): Dropout(p=0.5, inplace=False)
    (4): Linea

In [None]:
model = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    learning_rate=0.01,
    num_epochs=30,
    patience=5,
    save_path="/content/drive/MyDrive/Projet non-alternant/Audio/best_audio_cnn.pth"
)

0


KeyboardInterrupt: 

In [None]:
# Check one batch
for batch_data, batch_labels in train_loader:
    print(f"Batch data shape: {batch_data.shape}")  # e.g., [32, 1, 128, 128]
    print(f"Batch labels shape: {batch_labels.shape}")  # e.g., [32]
    print(f"Data range: [{batch_data.min():.2f}, {batch_data.max():.2f}]")
    break

Batch data shape: torch.Size([16, 1, 128, 431])
Batch labels shape: torch.Size([16])
Data range: [-4.65, 4.20]
