In [132]:
%matplotlib inline

### Imports

In [133]:
import csv
import os
from pathlib import Path


import torch
from torch import Tensor
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from typing import Dict, List, Tuple, Union


from torch.utils.data import Dataset,DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

import torchaudio

#mps_device = torch.device("mps")

### Import dataset

In [134]:
def load_commonvoice_item(
    line: List[str], header: List[str], path: str, folder_audio: str, ext_audio: str
) -> Tuple[Tensor, int, Dict[str, str]]:
    # Each line as the following data:
    # client_id, path, sentence, up_votes, down_votes, age, gender, accent

    if header[1] != "path":
        raise ValueError(f"expect `header[1]` to be 'path', but got {header[1]}")
    fileid = line[1]
    filename = os.path.join(path, folder_audio, fileid)
    if not filename.endswith(ext_audio):
        filename += ext_audio
    waveform, sample_rate = torchaudio.load(filename)

    dic = dict(zip(header, line))

    return waveform, sample_rate, dic


class COMMONVOICE(Dataset):
    """*CommonVoice* :cite:`ardila2020common` dataset.

    Args:
        root (str or Path): Path to the directory where the dataset is located.
             (Where the ``tsv`` file is present.)
        tsv (str, optional):
            The name of the tsv file used to construct the metadata, such as
            ``"train.tsv"``, ``"test.tsv"``, ``"dev.tsv"``, ``"invalidated.tsv"``,
            ``"validated.tsv"`` and ``"other.tsv"``. (default: ``"train.tsv"``)
    """

    _ext_txt = ".txt"
    _ext_audio = ".mp3"
    _folder_audio = "clips"

    def __init__(self, root: Union[str, Path], tsv: str = "train.tsv") -> None:

        # Get string representation of 'root' in case Path object is passed
        self._path = os.fspath(root)
        self._tsv = os.path.join(self._path, tsv)

        with open(self._tsv, "r") as tsv_:
            walker = csv.reader(tsv_, delimiter="\t")
            self._header = next(walker)
            self._walker = list(walker)

    def __getitem__(self, n: int) -> Tuple[Tensor, int, Dict[str, str]]:
        """Load the n-th sample from the dataset.

        Args:
            n (int): The index of the sample to be loaded

        Returns:
            Tuple of the following items;

            Tensor:
                Waveform
            int:
                Sample rate
            Dict[str, str]:
                Dictionary containing the following items from the corresponding TSV file;

                * ``"client_id"``
                * ``"path"``
                * ``"sentence"``
                * ``"up_votes"``
                * ``"down_votes"``
                * ``"age"``
                * ``"gender"``
                * ``"accent"``
        """
        line = self._walker[n]
        return load_commonvoice_item(line, self._header, self._path, self._folder_audio, self._ext_audio)


    def __len__(self) -> int:
        return len(self._walker)


In [135]:
commonvoice = COMMONVOICE(root=os.getcwd()+"/data/cv-corpus-7.0-singleword/fr", tsv="train.tsv")

In [136]:
import librosa

def extract_features(audio_path):
    audio, sample_rate = librosa.load(audio_path, sr=None)
    mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13)
    return mfccs

In [137]:
print(extract_features("./data/cv-corpus-7.0-singleword/fr/clips/common_voice_fr_21894151.mp3"))

[[-6.8731464e+02 -6.8731464e+02 -6.8731464e+02 ... -5.5644818e+02
  -5.9824695e+02 -6.8721765e+02]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00 ...  1.1173247e+02
   8.6702301e+01  1.3684773e-01]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00 ...  3.7776123e+01
   3.3123108e+01  1.3596201e-01]
 ...
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00 ...  1.8497684e+01
   1.8942513e+01  1.0864532e-01]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00 ...  9.8468342e+00
   1.4657414e+01  1.0294381e-01]
 [ 0.0000000e+00  0.0000000e+00  0.0000000e+00 ...  6.0690675e+00
   8.5757990e+00  9.6813396e-02]]


In [138]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import csv
import os

class CommonVoiceDataset(Dataset):
    def __init__(self, tsv_file, clips_folder, transform=None):
        self.clips_folder = clips_folder
        self.transform = transform
        self.data = []
        with open(tsv_file, newline='', encoding='utf-8') as file:
            reader = csv.DictReader(file, delimiter='\t')
            for row in reader:
                self.data.append(row)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data[idx]
        audio_path = os.path.join(self.clips_folder, row['path'])
        waveform, sample_rate = torchaudio.load(audio_path)
        
        # Apply transformations if any
        if self.transform:
            waveform = self.transform(waveform)
        
        # Here, you could also include feature extraction steps if necessary
        label = row['sentence']  # Or however you plan to use the label

        return waveform, sample_rate, label


# Example usage
tsv_file_train = f"{os.getcwd()}/data/cv-corpus-7.0-singleword/fr/train.tsv"
tsv_file_test = f"{os.getcwd()}/data/cv-corpus-7.0-singleword/fr/test.tsv"

clips_folder = f"{os.getcwd()}/data/cv-corpus-7.0-singleword/fr/clips/"

dataset = CommonVoiceDataset(tsv_file=tsv_file_train, clips_folder=clips_folder)

data_loader = DataLoader(dataset, batch_size=4, shuffle=True)

for waveform, sample_rate, label in data_loader:
    print(waveform.shape, sample_rate, label)


RuntimeError: Couldn't find appropriate backend to handle uri /Users/robinbochu/Documents/School/TelecomSaintEtienne/TSE3/Apprentissage automatique/projet_big_data/data/cv-corpus-7.0-singleword/fr/clips/common_voice_fr_22018669.mp3 and format None.

### Mettre nos dataset dans des dataloaders

### Entrainement

### Test

In [None]:
import os
import torch
from torch.utils.data import Dataset
import librosa

class CommonVoiceDataset(Dataset):
    def __init__(self, tsv_path, audio_dir):
        self.audio_dir = audio_dir
        
        # Read the TSV file and store the data
        self.data = []
        with open(tsv_path, 'r', encoding='utf-8') as f:
            next(f)  # Skip the header
            for line in f:
                line_content = line.strip().split('\t')
                self.data.append({
                    "client_id": line_content[0],
                    "path": line_content[1],
                    "sentence": line_content[2],
                })

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        audio_path = os.path.join(self.audio_dir, item['path'])
        waveform, sample_rate = librosa.load(audio_path, sr=None)
        mfcc = librosa.feature.mfcc(y=waveform, sr=sample_rate, n_mfcc=40)
        mfcc_tensor = torch.from_numpy(mfcc).float()
        # Ensure 3D tensor for CNN (channel, MFCC features, time)
        mfcc_tensor = mfcc_tensor.unsqueeze(0)
        
        label = item['sentence']  # Here, convert to appropriate label format as needed
        
        return mfcc_tensor, label

# Specify the TSV file path and audio directory path
train_tsv_file_path = './data/cv-corpus-7.0-singleword/fr/train.tsv'
test_tsv_file_path = './data/cv-corpus-7.0-singleword/fr/test.tsv'

audio_dir_path = './data/cv-corpus-7.0-singleword/fr/clips'

# Create the dataset
train_dataset = CommonVoiceDataset(train_tsv_file_path, audio_dir_path)
test_dataset = CommonVoiceDataset(test_tsv_file_path, audio_dir_path)


# Example: Access the first item in the dataset
waveform_tensor, sample_rate, label = train_dataset[0]
print(waveform_tensor.shape, sample_rate, label)

print(train_dataset[0][0])

torch.Size([145152]) 48000 Firefox
tensor([ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -5.0598e-06,
        -1.0700e-05, -2.6028e-05])


In [None]:
batch_size = 32
shuffle = True

train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle)


In [139]:
from torch.nn.utils.rnn import pad_sequence


class AudioCNN(nn.Module):
    def __init__(self, num_classes):
        super(AudioCNN, self).__init__()
        # Define your CNN architecture here
        # Example:
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3, 3), padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3, 3), padding=1)
        self.fc1 = nn.Linear(32 * (number_of_mfcc_features // 4) * (time_steps // 4), 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)  # Flatten the tensor for the fully connected layer
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [171]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import librosa
from torch.nn.utils.rnn import pad_sequence
import numpy as np

class CommonVoiceDataset(Dataset):
    def __init__(self, tsv_path, audio_dir, label_to_index):
        self.audio_dir = audio_dir
        self.label_to_index = label_to_index  # Add this line
        self.data = []
        with open(tsv_path, 'r', encoding='utf-8') as f:
            next(f)  # Skip header
            for line in f:
                line_content = line.strip().split('\t')
                self.data.append((line_content[1], self.label_to_index[line_content[2]]))  # Convert label to index here

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        audio_path, label = self.data[idx]
        audio_path_full = os.path.join(self.audio_dir, audio_path)
        waveform, sample_rate = librosa.load(audio_path_full, sr=None)
        mfcc = librosa.feature.mfcc(y=waveform, sr=sample_rate, n_mfcc=40)
        mfcc_tensor = torch.from_numpy(mfcc).float()
        mfcc_tensor = mfcc_tensor.unsqueeze(0)  # Add channel dimension
        return mfcc_tensor, label

def collate_fn(batch):
    # Unpack the batch
    mfccs, labels = zip(*batch)
    
    # Label encoding: Convert string labels to integers
    # For demonstration, let's assume your labels are already integer-encoded
    # If you have string labels, you should convert them to integers using a mapping dictionary
    # label_to_index = {'label1': 0, 'label2': 1, ...}
    # labels = [label_to_index[label] for label in labels]
    
    # Find the longest sequence (time dimension)
    max_length = max(mfcc.size(2) for mfcc in mfccs)
    # Find the max number of features (MFCC dimension)
    max_features = max(mfcc.size(1) for mfcc in mfccs)
    
    # Pad each MFCC tensor to the max length and features
    mfccs_padded = torch.zeros((len(mfccs), 1, max_features, max_length))
    for i, mfcc in enumerate(mfccs):
        # Determine padding sizes
        pad_feature_dim = max_features - mfcc.size(1)
        pad_time_dim = max_length - mfcc.size(2)
        # Pad the MFCC tensor and assign to the padded batch
        mfccs_padded[i] = nn.functional.pad(mfcc, (0, pad_time_dim, 0, pad_feature_dim))
    
    # Convert labels to tensor
    labels = torch.tensor(labels, dtype=torch.long)
    
    return mfccs_padded, labels

class AudioCNN(nn.Module):
    def __init__(self, num_classes=14):
        super(AudioCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3, 3), padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3, 3), padding=1)
        
        # Dynamically determine the flattened size after conv/pool layers
        self._to_linear = None
        self.conv_output_size((1, 40, 173))  # Adjust the shape based on your input data
        
        self.fc1 = nn.Linear(self._to_linear, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def conv_output_size(self, shape):
        with torch.no_grad():
            # Initialize a mock input based on the expected input shape
            mock_input = torch.rand(1, *shape)  # Shape is (channels, MFCC features, time)
            output = self.conv1(mock_input)
            output = self.pool(output)
            output = self.conv2(output)
            output = self.pool(output)
            # Compute the total number of features for the first linear layer
            self._to_linear = int(np.prod(output.size()[1:]))
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, self._to_linear)  # Flatten for FC
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x



# Assuming you have the following paths set up
tsv_path = './data/cv-corpus-7.0-singleword/fr/train.tsv'
audio_dir = './data/cv-corpus-7.0-singleword/fr/clips'

train_tsv_file_path = './data/cv-corpus-7.0-singleword/fr/train.tsv'
test_tsv_file_path = './data/cv-corpus-7.0-singleword/fr/test.tsv'

audio_dir_path = './data/cv-corpus-7.0-singleword/fr/clips'

# Example label to index mapping
label_to_index = {'Firefox': 0, 'oui': 1, 'non': 2, 'un': 3, 'deux': 4, 'trois': 5,'quatre': 6,'cinq': 7,'six': 8,'sept': 9,'huit': 10,'neuf':11,'Hey':12,'zéro':13}

# Update dataset initialization
dataset = CommonVoiceDataset(tsv_path, audio_dir, label_to_index)

# Instantiate dataset and data loader
data_loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)

# Define the model, loss function, and optimizer
num_classes = 14 # Replace with your actual number of classes
model = AudioCNN(num_classes)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(1):  # Change this to the actual number of epochs
    for mfccs, labels in data_loader:
        optimizer.zero_grad()   # Zero the gradients
        outputs = model(mfccs)  # Forward pass
        loss = criterion(outputs, labels)  # Compute the loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights

        # Print loss
        print(f'Epoch {epoch}, Loss: {loss.item()}')

ValueError: Expected input batch_size (8) to match target batch_size (4).