In [1]:
!pip install kaggle==1.5.12



In [2]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json  # Set permissions

In [3]:
!mkdir datasets

mkdir: cannot create directory ‘datasets’: File exists


In [None]:
!kaggle datasets download -d mozillaorg/common-voice -p /content/datasets --force

Downloading common-voice.zip to /content/datasets
 31% 3.71G/12.0G [03:00<06:15, 23.9MB/s]

In [None]:
%cd datasets

In [None]:
!unzip common-voice.zip

In [None]:
import pandas as pd

In [None]:
dev_df = pd.read_csv('cv-valid-dev.csv')
dev_df.head()

In [None]:
dev_df.info()

In [None]:
!pip install pydub

In [None]:
from IPython.display import Audio, display
import os

def display_audio(audio_path):
  display(Audio(audio_path))

audio_directory = 'cv-valid-dev/cv-valid-dev'
audio_files = [audio for audio in os.listdir(audio_directory)]

for audio_file in audio_files[:5]:
  audio_path = os.path.join(audio_directory, audio_file)
  display_audio(audio_path)


In [None]:
dev_df['down_votes'].value_counts()

In [None]:
dev_df[dev_df['down_votes'] == 0][:10]

In [None]:
audio_path = os.path.join(audio_directory, "sample-000000.mp3")
display_audio(audio_path)

 => It seems that the difference between upvotes and downvotes doesn't relate to the quality of audios

**Preprocessing steps**



In [None]:
dev_df = dev_df.drop(columns=dev_df.columns[dev_df.columns.get_loc('up_votes') : dev_df.columns.get_loc('duration') + 1])

In [None]:
dev_df.head()

In [None]:
import torch
import torchaudio
import warnings
import matplotlib.pyplot as plt
warnings.filterwarnings('ignore')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
train_df, test_df = torch.utils.data.random_split(dev_df, [0.8, 0.2])

In [None]:
def preprocess_for_rnnt_torchaudio(file_path, target_sr=16000, max_duration=5):
    """GPU-accelerated preprocessing pipeline for RNN-T using TorchAudio"""

    # 1. Load audio directly to GPU (PyTorch 2.0+)
    waveform, sr = torchaudio.load(file_path)
    waveform = waveform.to(device)

    # 2. Resample if needed
    if sr != target_sr:
        waveform = torchaudio.functional.resample(waveform, sr, target_sr)

    # 3. Trim silence (VAD)
    waveform = torchaudio.functional.vad(waveform, sample_rate=target_sr, trigger_level=20)

    # 4. Peak normalization (GPU)
    waveform = torch.nn.functional.normalize(waveform, dim=-1)

    # 5. Fixed-length padding/cropping
    max_samples = target_sr * max_duration
    if waveform.size(-1) > max_samples:
        waveform = waveform[..., :max_samples]
    else:
        pad_amount = max_samples - waveform.size(-1)
        waveform = torch.nn.functional.pad(waveform, (0, pad_amount))

    # 6. Extract Log-Mel features (GPU)
    mel_transform = torchaudio.transforms.MelSpectrogram(
        sample_rate=target_sr,
        n_fft=400,
        hop_length=160,
        n_mels=80,
    ).to(device)

    mel_spec = mel_transform(waveform)
    log_mel = torch.log(mel_spec + 1e-6)  # (1, 80, T)

    return log_mel.squeeze(0).T  # (T, 80)

In [None]:
audio_directory = 'cv-valid-dev'
audio_files = [train_df.dataset.loc[train_df.indices[i], 'filename'] for i in range(len(train_df))]

In [None]:
features = []
for audio in audio_files:
  audio_feature = preprocess_for_rnnt_torchaudio(os.path.join(audio_directory, audio))
  features.append(audio_feature)

In [None]:
texts = dev_df["text"].tolist()

In [None]:
texts[:5]

In [None]:
import re

In [None]:
def clean_text(text):
    # Keep original case - REMOVED .lower()
    text = text.strip()

    # Handle apostrophes/contractions carefully
    text = re.sub(r"([!\"'#$%&()*\+,-./:;<=>?@\\\[\]^_`{|}~])", r" \1 ", text)
    text = re.sub("[^A-Za-z0-9]+", " ", text)
    text = re.sub(" +", " ", text)

    return text

In [None]:
# Apply cleaning
cleaned_texts = [clean_text(t) for t in texts if isinstance(t, str)]

In [None]:
cleaned_texts[:5]

In [None]:
from tokenizers import Tokenizer, models, trainers

# Initialize with byte-level BPE (better for names/contractions)
tokenizer = Tokenizer(models.BPE(unk_token="<unk>"))

# Simpler pre-tokenizer (preserve apostrophes)
tokenizer.pre_tokenizer = pre_tokenizers.Sequence([
    pre_tokenizers.Whitespace(),
    pre_tokenizers.Split("'", behavior="isolated")  # Special handling for apostrophes
])

# Trainer with larger vocab
trainer = trainers.BpeTrainer(
    special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"],
    vocab_size=3900,  # Increased for better word coverage
    min_frequency=3,
    initial_alphabet=pre_tokenizers.ByteLevel.alphabet()  # Better for special characters
)

# Train on properly cleaned text
tokenizer.train_from_iterator([clean_text(t) for t in texts], trainer)

In [None]:
# Load tokenizer
tokenizer = Tokenizer.from_file("cv_tokenizer.json")

# Test encoding
sample_text = "Dr O'Neill 's patient said We 'll check again tomorrow"
encoding = tokenizer.encode(sample_text.lower())
print("Tokens:", encoding.tokens)
print("IDs:", encoding.ids)

In [None]:
class Encoder(torch.nn.Module):
    def __init__(self, input_dim=80, hidden_dim=256, num_layers=3):
        super().__init__()
        self.lstm = torch.nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True,
        )
        self.linear = torch.nn.Linear(hidden_dim * 2, hidden_dim)

    def forward(self, x):
        x, _ = self.lstm(x)  # (B, T, 2*H)
        x = self.linear(x)    # (B, T, H)
        return x

In [None]:
class Decoder(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim=128, hidden_dim=256):
        super().__init__()
        self.embed = torch.nn.Embedding(vocab_size, embed_dim)
        self.lstm = torch.nn.LSTM(embed_dim, hidden_dim, batch_first=True)

    def forward(self, y):
        y = self.embed(y)     # (B, U, E)
        y, _ = self.lstm(y)   # (B, U, H)
        return y

In [None]:
class JointNetwork(torch.nn.Module):
    def __init__(self, hidden_dim, vocab_size):
        super().__init__()
        self.linear = torch.nn.Linear(hidden_dim, vocab_size)

    def forward(self, h_enc, h_dec):
        # h_enc: (B, T, H), h_dec: (B, U, H)
        h_enc = h_enc.unsqueeze(2)  # (B, T, 1, H)
        h_dec = h_dec.unsqueeze(1)  # (B, 1, U, H)
        out = torch.tanh(h_enc + h_dec)  # (B, T, U, H)
        return self.linear(out)  # (B, T, U, V)

In [None]:
class RNNTransducer(torch.nn.Module):
    def __init__(self, vocab_size, encoder_dim=256, decoder_dim=256):
        super().__init__()
        self.encoder = Encoder(hidden_dim=encoder_dim)
        self.decoder = Decoder(vocab_size, hidden_dim=decoder_dim)
        self.joint = JointNetwork(encoder_dim, vocab_size)

    def forward(self, x, y):
        h_enc = self.encoder(x)  # (B, T, H)
        h_dec = self.decoder(y)  # (B, U, H)
        logits = self.joint(h_enc, h_dec)  # (B, T, U, V)
        return logits

In [None]:
# from torch.utils.data import Dataset, DataLoader

# class AudioDataset(Dataset):
#     def __init__(self, data, char_to_idx):
#         self.data = data
#         self.char_to_idx = char_to_idx

#     def __len__(self):
#         return len(self.data)

#     def __getitem__(self, idx):
#         wav_path, text = self.data[idx]
#         features = extract_features(wav_path)
#         tokens = text_to_tokens(text, self.char_to_idx)
#         return features, torch.LongTensor(tokens)

# # Create DataLoader
# dataset = AudioDataset(data, char_to_idx)
# dataloader = DataLoader(dataset, batch_size=2, collate_fn=lambda x: x)  # Custom collate needed