<a href="https://colab.research.google.com/github/Aneeshshastri/SN_BOSE_MILAN_ASR/blob/main/Train_Quartznet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:

!pip install transformers datasets accelerate audiomentations librosa==0.10.1


Collecting audiomentations
  Downloading audiomentations-0.43.1-py3-none-any.whl.metadata (11 kB)
Collecting librosa==0.10.1
  Downloading librosa-0.10.1-py3-none-any.whl.metadata (8.3 kB)
Collecting numpy-minmax<1,>=0.3.0 (from audiomentations)
  Downloading numpy_minmax-0.5.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.0 kB)
Collecting numpy-rms<1,>=0.4.2 (from audiomentations)
  Downloading numpy_rms-0.6.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.5 kB)
Collecting python-stretch<1,>=0.3.1 (from audiomentations)
  Downloading python_stretch-0.3.1-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.7 kB)
Collecting soxr>=0.3.2 (from librosa==0.10.1)
  Downloading soxr-0.5.0.post1-cp312-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Downloading librosa-0.10.1-py3-none-any.whl (253 kB)
[2K   [90m━━━━━━━━━━━━━━━━━

In [2]:
from google.colab import files
print("Please upload the kaggle.json file you downloaded from your Kaggle account:")
files.upload()

Please upload the kaggle.json file you downloaded from your Kaggle account:


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"aneeshshastri","key":"fee44b7bbfbefd812576ba350f4c6d1b"}'}

In [3]:
# Create the .kaggle directory and move the file there
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

# Set the file permissions to be readable only by you
!chmod 600 ~/.kaggle/kaggle.json

In [4]:
!pip install kaggle



In [5]:
# Create a directory for the dataset
!mkdir -p /content/dataset

# Download the dataset into the created directory
!kaggle datasets download -d aneeshshastri/cleaned-audio-milan -p /content/dataset/

!unzip /content/dataset/cleaned-audio-milan.zip -d /content/dataset/

Dataset URL: https://www.kaggle.com/datasets/aneeshshastri/cleaned-audio-milan
License(s): unknown
^C
unzip:  cannot find or open /content/dataset/cleaned-audio-milan.zip, /content/dataset/cleaned-audio-milan.zip.zip or /content/dataset/cleaned-audio-milan.zip.ZIP.


In [None]:
# Step 1: Install necessary libraries
# Step 2: Import all required modules
import os
import random
import librosa
import numpy as np
import tensorflow as tf
from datasets import load_dataset, Audio, Dataset
from transformers import AutoProcessor, AutoModelForCTC, TrainingArguments, Trainer
from dataclasses import dataclass
from typing import Dict, List, Union
from scipy.signal import butter, lfilter
from audiomentations import AddGaussianNoise

# Step 3: Include your full Augmenter class
class Augmenter:
    def __init__(self, sr=16000,
                 noise_prob=0.3, noise_max_amp=0.01,
                 reverb_prob=0.3, reverb_delay=0.025, reverb_decay=0.2,
                 shuffle_prob=0.05, time_stretch_prob=0.3, time_stretch_range=(0.9, 1.1),
                 gaps_prob=0.1, gaps_n=4, gaps_max_duration=0.1,
                 freq_mask_prob=0.3, freq_mask_n=1):

        self.sr = sr
        self.noise_aug = AddGaussianNoise(p=1.0, max_amplitude=noise_max_amp, sample_rate=sr)
        self.noise_prob = noise_prob
        self.reverb_prob = reverb_prob
        self.reverb_delay = reverb_delay
        self.reverb_decay = reverb_decay
        self.shuffle_prob = shuffle_prob
        self.time_stretch_prob = time_stretch_prob
        self.time_stretch_range = time_stretch_range
        self.gaps_prob = gaps_prob
        self.gaps_n = gaps_n
        self.gaps_max_duration = gaps_max_duration
        self.freq_mask_prob = freq_mask_prob
        self.freq_mask_n = freq_mask_n

    def augment(self, audio):
        distortions = []
        if random.random() < self.noise_prob: distortions.append('noise')
        if random.random() < self.reverb_prob: distortions.append('reverb')
        if random.random() < self.shuffle_prob: distortions.append('shuffle')
        if random.random() < self.time_stretch_prob: distortions.append('time_stretch')
        if random.random() < self.gaps_prob: distortions.append('missing_gaps')
        if random.random() < self.freq_mask_prob: distortions.append('frequency_masking')

        # Make sure audio is a numpy array for processing
        audio = np.array(audio, dtype=np.float32)

        for distortion in distortions:
            if distortion == 'noise': audio = self._add_noise(audio)
            elif distortion == 'reverb': audio = self._add_reverb(audio)
            elif distortion == 'shuffle': audio = self._segment_shuffle(audio)
            elif distortion == 'time_stretch': audio = self._time_stretch(audio)
            elif distortion == 'missing_gaps': audio = self._add_missing_gaps(audio)
            elif distortion == 'frequency_masking': audio = self._add_frequency_mask(audio)
        return audio

    def _add_noise(self, audio):
        return self.noise_aug(samples=audio, sample_rate=self.sr)

    def _add_reverb(self, audio):
        delay = int(self.reverb_delay * self.sr)
        reverb = np.pad(audio * self.reverb_decay, (delay, 0), 'constant')
        if len(reverb) > len(audio):
            reverb = reverb[:len(audio)]
        return audio + reverb

    def _segment_shuffle(self, audio):
        # Using a simple and effective shuffle
        segments = np.array_split(audio, 3)
        random.shuffle(segments)
        return np.concatenate(segments)

    def _time_stretch(self, audio):
        return librosa.effects.time_stretch(y=audio, rate=random.uniform(*self.time_stretch_range))

    def _add_missing_gaps(self, audio):
        gap_audio = np.copy(audio)
        for _ in range(self.gaps_n):
            gap_duration = random.uniform(0.1, self.gaps_max_duration)
            gap_samples = int(gap_duration * self.sr)
            if len(gap_audio) > gap_samples:
                start = random.randint(0, len(gap_audio) - gap_samples)
                gap_audio[start:start + gap_samples] = 0
        return gap_audio

    def _add_frequency_mask(self, audio):
        masked_audio = np.copy(audio)
        nyquist = self.sr / 2
        for _ in range(self.freq_mask_n):
            l_freq = random.uniform(500, 5000)
            h_freq = l_freq + random.uniform(500, 2000)
            if h_freq >= nyquist: continue
            b, a = butter(N=4, Wn=[l_freq, h_freq], btype="bandstop", fs=self.sr)
            masked_audio = lfilter(b, a, masked_audio)
        return masked_audio

# Step 4: Your custom load_data function
Training_dirs = "/content/dataset/LibriSpeech/train-clean-100/"

def load_data():
    file_paths = []
    transcriptions = []
    # Simplified loop for clarity
    for speaker_id in os.listdir(Training_dirs):
        speaker_path = os.path.join(Training_dirs, speaker_id)
        if not os.path.isdir(speaker_path): continue
        for chapter_id in os.listdir(speaker_path):
            chapter_path = os.path.join(speaker_path, chapter_id)
            if not os.path.isdir(chapter_path): continue

            trans_file = f"{speaker_id}-{chapter_id}.trans.txt"
            trans_path = os.path.join(chapter_path, trans_file)

            if os.path.exists(trans_path):
                with open(trans_path, 'r') as f:
                    for line in f:
                        parts = line.strip().split(' ', 1)
                        file_id = parts[0]
                        text = parts[1]

                        audio_path = os.path.join(chapter_path, f"{file_id}.flac")
                        if os.path.exists(audio_path):
                            file_paths.append(audio_path)
                            transcriptions.append(text)
    return file_paths, transcriptions

# Step 5: Execute the pipeline
print("Loading data using custom function...")
file_paths, transcriptions = load_data()

# Create a dictionary for our data
data_dict = {"file_path": file_paths, "transcription": transcriptions}

# Bridge the gap: Convert your lists into a Hugging Face Dataset object
hf_dataset = Dataset.from_dict(data_dict)

print(f"\nCreated a dataset with {len(hf_dataset)} samples.")

# Step 6: Instantiate Augmenter, Model, and Processor
augmenter = Augmenter(
    noise_prob=0.2, reverb_prob=0.1, shuffle_prob=0.1,
    time_stretch_prob=0.2, gaps_prob=0.1, freq_mask_prob=0.2
)
model_id = "nvidia/stt_en_quartznet_15x5_ctc"
processor = AutoProcessor.from_pretrained(model_id)
model = AutoModelForCTC.from_pretrained(model_id)

# Step 7: Create the Preprocessing Function with Augmentation
def prepare_dataset(batch):
    # Load audio using librosa
    audio, sr = librosa.load(batch["file_path"], sr=16000)

    # Apply custom augmentations
    augmented_audio = augmenter.augment(audio)

    # Process audio and text using the HF processor
    batch["input_values"] = processor(augmented_audio, sampling_rate=16000).input_values[0]
    batch["labels"] = processor(text=batch["transcription"]).input_ids
    return batch

processed_ds = hf_dataset.map(prepare_dataset, remove_columns=hf_dataset.column_names)

# Step 8: Define Data Collator and Trainer
@dataclass
class DataCollatorCTCWithPadding:
    processor: AutoProcessor
    padding: Union[bool, str] = True
    def __call__(self, features: List[Dict[str, Union[List[int], np.ndarray]]]) -> Dict[str, tf.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        batch = processor.pad(input_features, padding=self.padding, return_tensors="tf")
        labels_batch = processor.pad(labels=label_features, padding=self.padding, return_tensors="tf")
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
        batch["labels"] = labels
        return batch

data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

training_args = TrainingArguments(
  output_dir="/kaggle/working/quartznet-finetuned-custom",
  per_device_train_batch_size=16,
  num_train_epochs=15,
  fp16=True,
  learning_rate=1e-4,
  save_total_limit=2,
  # You can add evaluation and logging steps here
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    train_dataset=processed_ds,
    tokenizer=processor.feature_extractor,
)

# Step 9: Start Fine-Tuning
print("\n--- Starting Model Fine-Tuning ---")
trainer.train()