In [1]:
import os
import torchaudio

# Create data folder if missing
os.makedirs("./data", exist_ok=True)

# Dowload archive of train-clean-100 and unzip it in data folder
# Or load it if previously downloaded
dataset = torchaudio.datasets.LIBRISPEECH("./data", url="train-clean-100", 
                                          download=not(os.path.isdir("data/LibriSpeech")))

In [10]:
import pandas as pd
import utils.dataset_metadata_parser as dmp

# Load speakers metadata and keep only those in train-clean-100
speaker_df = dmp.parse_pipe("data/LibriSpeech/SPEAKERS.TXT")
filtered_speaker_df = speaker_df[speaker_df["SUBSET"] == "train-clean-100"]

# Total number of audio extracts
total_extracts = len(filtered_speaker_df)

# Total duration of dataset (in minutes)
total_duration = filtered_speaker_df["MINUTES"].sum()

# Average duration per extract
average_duration = filtered_speaker_df["MINUTES"].mean()

# Total number of unique speakers (by ID or NAME, depending on what defines a speaker)
total_speakers = filtered_speaker_df["ID"].nunique()

# Average number of extracts per speaker
average_extracts_per_speaker = total_extracts / total_speakers

# Number of M and F
morf_number = filtered_speaker_df['SEX'].value_counts()

# Print the results
print(f"Total number of audio extracts: {total_extracts}")
print(f"Total duration of dataset (minutes): {total_duration:.2f}")
print(f"Average duration per extract (minutes): {average_duration:.2f}")
print(f"Total number of speakers: {total_speakers}")
print(f"Average number of extracts per speaker: {average_extracts_per_speaker:.2f}")
print(f"Number of M: {morf_number['M']}, and F:{morf_number['F']}")

Total number of audio extracts: 251
Total duration of dataset (minutes): 6035.41
Average duration per extract (minutes): 24.05
Total number of speakers: 251
Average number of extracts per speaker: 1.00
Number of M: 126, and F:125


In [None]:
import torch
from torchaudio.transforms import MFCC

# Parameters for MFCC extraction
sample_rate = 16000
n_mfcc = 20
melkwargs = {
    "n_fft": 400,       # frame size of 25ms
    "hop_length": 160,  # hop size of 10ms
    "n_mels": 40        # number of Mel filterbanks
}

# Initialize the MFCC transform
mfcc_transform = MFCC(
    sample_rate=sample_rate,
    n_mfcc=n_mfcc,
    melkwargs=melkwargs
)

class LibriSpeechMFCC(torch.utils.data.Dataset):
    """
    PyTorch Dataset that wraps LibriSpeech and applies MFCC transform.
    Returns:
        mfcc: Tensor of shape (n_mfcc, time_frames)
        speaker_id: int
    """
    def __init__(self, root="./data", url="train-clean-100", download=False, transform=None):
        self.dataset = torchaudio.datasets.LIBRISPEECH(root, url=url, download=download)
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        waveform, sr, _, speaker_id, _, _ = self.dataset[idx]
        # Resample if needed
        if sr != sample_rate:
            waveform = torchaudio.functional.resample(waveform, sr, sample_rate)
        # Apply MFCC transform: output shape (channel, n_mfcc, time)
        mfcc = self.transform(waveform)
        # Remove channel dimension for mono audio
        mfcc = mfcc.squeeze(0)  # now (n_mfcc, time)
        return mfcc, speaker_id

# Usage example
if __name__ == "__main__":
    # Ensure data directory exists and dataset is downloaded
    os.makedirs("./data", exist_ok=True)
    dataset = LibriSpeechMFCC(root="./data",
                              url="train-clean-100",
                              download=os.path.isdir("./data/LibriSpeech") == False,
                              transform=mfcc_transform)

    dataloader = torch.utils.data.DataLoader(dataset,
                                             batch_size=16,
                                             shuffle=True,
                                             num_workers=2)

    # Iterate one batch to check shapes
    for batch_mfcc, batch_speaker in dataloader:
        print("MFCC batch shape:", batch_mfcc.shape)      # (batch, n_mfcc, time)
        print("Speaker IDs:", batch_speaker)
        break


In [16]:
import soundfile as sf

info = sf.info("./data/LibriSpeech/train-clean-100/19/198/19-198-0000.flac")
print(f"Durée : {info.frames / info.samplerate:.2f} sec")


Durée : 1.97 sec


In [18]:
from collections import defaultdict

# Initialiser les compteurs
num_audio_files = 0
total_duration_sec = 0.0
speakers = set()

# Parcourir les éléments du dataset
for waveform, sample_rate, utterance, speaker_id, chapter_id, utterance_id in dataset:
    num_audio_files += 1
    speakers.add(speaker_id)
    duration_sec = waveform.shape[1] / sample_rate
    total_duration_sec += duration_sec

# Afficher les statistiques
print(f"Nombre total d'enregistrements audio : {num_audio_files}")
print(f"Nombre total de speakers : {len(speakers)}")
print(f"Durée totale des enregistrements : {total_duration_sec / 3600:.2f} heures")

RuntimeError: Couldn't find appropriate backend to handle uri ./data\LibriSpeech\train-clean-100\103\1240\103-1240-0000.flac and format None.

In [3]:
# Filtrer 10 premiers locuteurs et leurs 100 fichiers chacun (~10 h)
locuteurs = sorted({speaker for (_, _, _, speaker, _, _) in dataset})[:10]
subset = [(wave, sr, _, spk, _, _) for (wave, sr, _, spk, _, _) in dataset if spk in locuteurs][:1000]
print(len(subset), "extraits audio (~10h)")

1000 extraits audio (~10h)


In [4]:
import torchaudio.transforms as T
mfcc_transform = T.MFCC(sample_rate=16000, n_mfcc=40)
waveform, sr, _, speaker_id, _, _ = subset[0]
mfcc = mfcc_transform(waveform)
print(mfcc.shape)

torch.Size([1, 40, 158])




In [5]:
import torch
import torch.nn as nn

class SpeakerNet(nn.Module):
    def __init__(self, num_speakers):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 10 * 20, 256),
            nn.ReLU(),
            nn.Linear(256, num_speakers)
        )

    def forward(self, x):
        x = self.conv(x)
        return self.fc(x)

In [12]:
import torch
from torch.nn.utils.rnn import pad_sequence

# batch : liste de tuples (waveform [1,T], sr, ..., speaker, ...)
def collate_fn(batch):
    # On sépare vagues et labels
    waves = [item[0].squeeze(0).t() for item in batch]  # chaque wave [T]
    speakers = torch.tensor([item[3] for item in batch])
    # Padding : on aligne sur la plus grande longueur T_max
    padded_waves = pad_sequence(waves, batch_first=True)  # [B, T_max]
    # Retour au format [B,1,T_max]
    padded_waves = padded_waves.unsqueeze(1)
    # Extraction MFCC sur l'ensemble
    mfccs = mfcc_transform(padded_waves)  # [B, n_mfcc, T_feat]
    # On transpose si nécessaire selon l'architecture
    mfccs = mfccs.unsqueeze(1)  # [B,1,n_mfcc,T_feat]
    return mfccs, speakers

In [None]:
# Déterminer le nombre de locuteurs dans `dataset`
speaker_ids = {speaker for (_, _, _, speaker, _, _) in dataset}
num_speakers = len(speaker_ids)

from torch.utils.data import DataLoader

dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
model = SpeakerNet(num_speakers=num_speakers)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(10):
    for wave, sr, _, speaker, _, _ in dataloader:
        # Extraction MFCC
        mfcc = mfcc_transform(wave).unsqueeze(1)  # [B,1,40,T]
        logits = model(mfcc)
        loss = criterion(logits, speaker)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch} – loss: {loss.item():.4f}")

ValueError: not enough values to unpack (expected 6, got 2)

In [None]:
speaker_ids = {speaker for (_, _, _, speaker, _, _) in dataset}
num_speakers = len(speaker_ids)

from torch.utils.data import DataLoader

# On réutilise la collate_fn définie en §2.4 pour le padding et extraction MFCC

dataloader = DataLoader(dataset,
                        batch_size=32,
                        shuffle=True,
                        collate_fn=collate_fn)

model = SpeakerNet(num_speakers=num_speakers)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Entraînement avec batch de MFCCs déjà préparés
def train(model, dataloader):
    model.train()
    for epoch in range(10):
        total_loss = 0.0
        for mfccs, speakers in dataloader:
            logits = model(mfccs)             # mfccs: [B,1,n_mfcc,T]
            loss = criterion(logits, speakers)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch} | avg loss: {avg_loss:.4f}")

# Lancer l'entraînement
train(model, dataloader)

RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [32, 1, 1, 40, 1314]

In [None]:
# Sauvegarde
torch.save(model.state_dict(), "pretrained_speaker.pth")

# Pour recharger ultérieurement
model = SpeakerNet(num_speakers=num_speakers)
model.load_state_dict(torch.load("pretrained_speaker.pth"))
model.eval()

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import torch

# Split 80/20 train/val
from torch.utils.data import random_split
train_len = int(0.8 * len(dataset))
val_len = len(dataset) - train_len
train_ds, val_ds = random_split(dataset, [train_len, val_len])
val_loader = DataLoader(val_ds, batch_size=32)

# Prédictions et vérité
y_true, y_pred = [], []
model.eval()
with torch.no_grad():
    for wave, sr, _, speaker, _, _ in val_loader:
        mfcc = mfcc_transform(wave).unsqueeze(1)
        logits = model(mfcc)
        preds = logits.argmax(dim=1)
        y_true.extend(speaker.tolist())
        y_pred.extend(preds.tolist())

# Matrice de confusion
cm = confusion_matrix(y_true, y_pred)
print("Matrice de confusion :")
print(cm)

# Rapport précision / rappel
report = classification_report(y_true, y_pred)
print("Rapport de classification :")
print(report)

In [None]:
import itertools
import matplotlib.pyplot as plt

# Affichage graphique de la matrice de confusion
plt.figure(figsize=(8, 6))
plt.imshow(cm, interpolation='nearest', aspect='auto')
plt.title('Matrice de confusion')
plt.colorbar()

# Etiquettes des axes (speaker IDs)
labels = sorted(set(y_true))
plt.xticks(range(len(labels)), labels, rotation=45)
plt.yticks(range(len(labels)), labels)

# Annotations
thresh = cm.max() / 2
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(j, i, format(cm[i, j], 'd'),
             horizontalalignment='center',
             color='white' if cm[i, j] > thresh else 'black')

plt.ylabel('Vérité')
plt.xlabel('Prédiction')
plt.tight_layout()
plt.show()