In [None]:
!pip install librosa matplotlib mutagen pandas torch

import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from sklearn.model_selection import train_test_split

# EDA

In [7]:
import os
import librosa

train_folder = 'train'
for f in os.listdir(train_folder):
    if f.endswith('.wav'):
        path = os.path.join(train_folder, f)
        y, sr = librosa.load(path, sr=None)  # Load with original sample rate
        if sr != 16000:
            print(f"{f} has sample rate {sr}")

# Nothings gets printed, so all files are 16000 Hz

In [None]:
# Loading an example file
audio_path = 'Test set/3599.wav'
y, sr = librosa.load(audio_path, sr=None)

# Plot the waveform
plt.figure(figsize=(10, 4))
librosa.display.waveshow(y, sr=sr)
plt.title('Waveform of 3599.wav')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.show()

In [None]:
import os
import librosa

train_folder = 'train'
train_wav_files = [f for f in os.listdir(train_folder) if f.endswith('.wav')]
print(f"Number of .wav files in train folder: {len(train_wav_files)}")

total_duration = 0
for f in train_wav_files:
    y, sr = librosa.load(os.path.join(train_folder, f), sr=None)
    duration = librosa.get_duration(y=y, sr=sr)
    total_duration += duration

avg_duration = total_duration / len(train_wav_files)
print(f"Average duration: {avg_duration:.2f} seconds")

speaker_ids = {filename.split('_')[0] for filename in train_wav_files}
male_speakers = [s for s in speaker_ids if s.endswith('m')]
female_speakers = [s for s in speaker_ids if s.endswith('f')]

print(f"Total unique speakers: {len(speaker_ids)}")
print(f"Male speakers: {len(male_speakers)}")
print(f"Female speakers: {len(female_speakers)}")
print("\nMale speaker IDs:", sorted(male_speakers))
print("Female speaker IDs:", sorted(female_speakers))


In [None]:
durations = []
for f in train_wav_files:
    y, sr = librosa.load(os.path.join(train_folder, f), sr=None)
    duration = librosa.get_duration(y=y, sr=sr)
    durations.append(duration)

min_duration = min(durations)
max_duration = max(durations)
std_duration = np.std(durations)

print(f"Minimum duration: {min_duration:.2f} seconds")
print(f"Maximum duration: {max_duration:.2f} seconds")
print(f"Standard deviation: {std_duration:.2f} seconds")

plt.figure(figsize=(10, 4))
plt.hist(durations, bins=30)
plt.title('Distribution of Audio Durations')
plt.xlabel('Duration (seconds)')
plt.ylabel('Count')
plt.show()


### Interpreting
Skewed to the right (longer tail there). Most audio files between 3 and 7 sec. MOst common durations are around 4.5-6 sec. Few outliers longer than 10 sec, but rare. What to do with outliers?

In [None]:
import wave  # Import the wave module for WAV file handling
with wave.open('Test set/3599.wav', 'rb') as wav:  # Open WAV file in read-binary mode
    print(f"Channels: {wav.getnchannels()}")  # Print number of audio channels
    print(f"Sample width: {wav.getsampwidth()}")  # Print sample width in bytes - This shows how many bytes are used to store each audio sample (e.g., 2 bytes = 16-bit audio)
    print(f"Frame rate: {wav.getframerate()}")  # Print sampling frequency
    print(f"Frames: {wav.getnframes()}")  # Print total number of frames
    print(f"Parameters: {wav.getparams()}")  # Print all WAV file parameters



# The accent is encoded in the first character of the file name with a single number from 1 to 5. The gender is encoded as a single letter (’m’ or ’f’) corresponding to the second character of the file name.". 

In [None]:
from mutagen.wave import WAVE

audio = WAVE('Test set/3599.wav')
print(audio.tags)  # print None if there are no tags

In [None]:
# Create a list to store our dataset information
dataset_manifest = []

for file_path in train_wav_files:
    # Extract filename without extension
    filename = os.path.basename(file_path).split('.')[0]
    
    # Extract accent (first character) and convert to 0-based index
    accent = int(filename[0]) - 1  # Convert 1-5 to 0-4
    
    # Extract gender (second character)
    gender = filename[1]
    
    # Create dictionary with file information
    file_info = {
        'file_path': file_path,
        'accent': accent,
        'gender': gender
    }
    
    dataset_manifest.append(file_info)

# Convert to DataFrame for easier manipulation
df_manifest = pd.DataFrame(dataset_manifest)

# Display first few rows and basic statistics
print("Dataset Manifest Preview:")
print(df_manifest.head())
print("\nDataset Statistics:")
print(f"Total files: {len(df_manifest)}")
print("\nAccent distribution:")
print(df_manifest['accent'].value_counts().sort_index())
print("\nGender distribution:")
print(df_manifest['gender'].value_counts())


In [None]:
%who

# Preprocessing

## Approach A

In [1]:
import os
import torch
import torchaudio
import pandas as pd

def load_and_preprocess_audios_from_folder(folder_path, target_sr=16000):
    """
    Load and normalize all audio files in a folder using torchaudio, extracting accent and gender from filename.

    Args:
        folder_path (str): Path to folder containing .wav files
        target_sr (int): Sampling rate

    Returns:
        pd.DataFrame: DataFrame with columns ['file_path', 'waveform', 'accent', 'gender']
    """
    data = []
    for fname in os.listdir(folder_path):
        if fname.endswith('.wav'):
            file_path = os.path.join(folder_path, fname)
            # Load audio
            waveform, sr = torchaudio.load(file_path)
            # Normalize amplitude
            waveform = waveform / waveform.abs().max()
            # Extract accent and gender
            accent = int(fname[0])  # 1-5
            gender = fname[1]       # 'm' or 'f'
            data.append({
                'file_path': file_path,
                'waveform': waveform,
                'accent': accent,
                'gender': gender
            })
    return pd.DataFrame(data)

df = load_and_preprocess_audios_from_folder("/Users/larsheijnen/DL/Train")
print(df.head())

#Size first waveform
print(df['waveform'].iloc[0].shape)


                                 file_path  \
0  /Users/larsheijnen/DL/Train/2m_9039.wav   
1  /Users/larsheijnen/DL/Train/4f_1887.wav   
2  /Users/larsheijnen/DL/Train/4f_9571.wav   
3  /Users/larsheijnen/DL/Train/1m_3736.wav   
4  /Users/larsheijnen/DL/Train/1m_3078.wav   

                                            waveform  accent gender  
0  [[tensor(-0.0001), tensor(-0.0001), tensor(-5....       2      m  
1  [[tensor(0.), tensor(4.4749e-05), tensor(0.), ...       4      f  
2  [[tensor(-0.0001), tensor(-0.0002), tensor(-0....       4      f  
3  [[tensor(-0.0003), tensor(-0.0003), tensor(-0....       1      m  
4  [[tensor(-0.0008), tensor(-0.0009), tensor(-0....       1      m  
torch.Size([1, 41400])


In [2]:
import os
import torch
import torchaudio
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

# Convert accent from 1-5 to 0-4
if 'accent' in df.columns:
    df['accent_label'] = df['accent'].apply(lambda x: int(x) - 1)
else:
    print("Warning: 'accent' column not found in DataFrame. Accent prediction will not work.")
    # Add a dummy label if you want to proceed with model structure testing
    df['accent_label'] = 0

In [3]:
# --- 1. Device Configuration ---
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

# Voor macos met M-chips

Using device: mps


In [4]:
# --- 2. PyTorch Dataset ---
class AccentDatasetRNN(Dataset):
    def __init__(self, dataframe):
        self.df = dataframe

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # Waveform is already a tensor [1, length] from your preprocessing
        waveform = self.df.iloc[idx]['waveform'].squeeze(0) # RNN expects [seq_len, features] or [batch, seq_len, features]
                                                           # Here, seq_len is audio length, features is 1 (mono)
        accent_label = self.df.iloc[idx]['accent']
        # Convert accent 1-5 to 0-4 for CrossEntropyLoss
        label = torch.tensor(accent_label - 1, dtype=torch.long)
        return waveform, label

In [5]:
# --- 3. Custom Collate Function ---
def collate_fn_rnn(batch):
    # batch is a list of tuples (waveform_tensor, label_tensor)
    waveforms = [item[0] for item in batch]
    labels = torch.stack([item[1] for item in batch])
    
    # Get lengths of each sequence
    lengths = torch.tensor([len(w) for w in waveforms])

    # Pad sequences in this batch (batch_first=False for pack_padded_sequence easier handling with RNN)
    # pad_sequence expects a list of tensors, each tensor is a sequence
    padded_waveforms = pad_sequence(waveforms, batch_first=False, padding_value=0.0)
    # padded_waveforms will be of shape (max_seq_len_in_batch, batch_size)
    # We need it as (max_seq_len_in_batch, batch_size, num_features=1) for RNN
    padded_waveforms = padded_waveforms.unsqueeze(-1) # Add feature dimension

    return padded_waveforms, lengths, labels

In [6]:
# --- 4. RNN Model Definition ---
class AudioRNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=128, num_layers=2, num_classes=5, rnn_type='LSTM', dropout=0.3):
        super(AudioRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn_type = rnn_type.upper()

        if self.rnn_type == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers, 
                               batch_first=False, dropout=dropout if num_layers > 1 else 0)
        elif self.rnn_type == 'GRU':
            self.rnn = nn.GRU(input_size, hidden_size, num_layers,
                              batch_first=False, dropout=dropout if num_layers > 1 else 0)
        else:
            raise ValueError("Unsupported RNN type. Choose 'LSTM' or 'GRU'.")
        
        self.fc = nn.Linear(hidden_size, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, lengths):
        # x shape: (max_seq_len, batch_size, input_size)
        # lengths shape: (batch_size)

        # Pack sequence
        # Enforce_sorted=False because DataLoader with shuffle=True might not guarantee order
        # Sorting is done within collate_fn if needed, but pack_padded_sequence can handle unsorted if lengths are provided
        packed_input = pack_padded_sequence(x, lengths.cpu(), batch_first=False, enforce_sorted=False)
        
        # RNN forward pass
        if self.rnn_type == 'LSTM':
            packed_output, (hn, cn) = self.rnn(packed_input)
        else: # GRU
            packed_output, hn = self.rnn(packed_input)
        
        # We can use the hidden state of the last layer (hn)
        # hn is (num_layers, batch_size, hidden_size)
        # We want the output of the last RNN layer for each sequence
        # Take the hidden state of the last layer:
        last_hidden = hn[-1] # Shape: (batch_size, hidden_size)
        
        out = self.dropout(last_hidden)
        out = self.fc(out)
        return out

In [None]:
# --- 5. Training Setup ---
# Hyperparameters
input_size = 1  # Mono audio
hidden_size = 128 # Can be tuned
num_rnn_layers = 2 # Can be tuned
num_classes = 5 # 5 accent classes
learning_rate = 0.001
batch_size = 16 # Adjust based on your 8GB RAM. Start small.
num_epochs = 10 # Start with a few epochs

# Instantiate dataset and dataloader
# Ensure 'df' is your DataFrame with 'waveform' and 'accent' columns
if 'df' not in locals():
    print("DataFrame 'df' not found. Please load your data.")
    # df = load_and_preprocess_audios_from_folder("/Users/larsheijnen/DL/Train") # Placeholder
else:
    train_dataset = AccentDatasetRNN(df)
    train_loader = DataLoader(dataset=train_dataset, 
                              batch_size=batch_size, 
                              shuffle=True, 
                              collate_fn=collate_fn_rnn,
                              pin_memory=False, # pin_memory is more relevant for CUDA
                              num_workers=0 # <--- CHANGE THIS
                             )

    # Instantiate model, loss, and optimizer
    model = AudioRNN(input_size=input_size, 
                     hidden_size=hidden_size, 
                     num_layers=num_rnn_layers, 
                     num_classes=num_classes,
                     rnn_type='GRU' # GRU is often a bit faster and lighter than LSTM
                    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    print(f"Starting training on {device} with batch size {batch_size}...")

    # --- Training Loop ---
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for i, (waveforms, lengths, labels) in enumerate(train_loader):
            waveforms = waveforms.to(device) 
            # lengths are already tensors, should be on CPU for pack_padded_sequence
            labels = labels.to(device)

            # Forward pass
            outputs = model(waveforms, lengths)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            if (i + 1) % 10 == 0: # Print every 10 batches
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
        avg_loss = total_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}] completed. Average Training Loss: {avg_loss:.4f}')

    print('Finished Training')

Starting training on mps with batch size 16...
