In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install jiwer torch torchaudio torchvision pydub torch-audiomentations==0.11.0 noisereduce
!pip install -U pip setuptools wheel
!pip install -U spacy
!python -m spacy download en_core_web_sm
!pip install pydub
!pip install datasets transformers
!pip install --upgrade accelerate
!pip install pyspellchecker

In [None]:
!gdown --folder https://drive.google.com/drive/folders/1-1VwT5HCp_oYiM4fJc3dCo-xT1uUe6TP?usp=sharing

In [None]:
!mkdir "/content/Datasets"

!unzip "/content/Advanced/ASR/Train.zip" -d "/content/Datasets"
!unzip "/content/Advanced/ASR/Test_Advanced.zip" -d "/content/Datasets"

In [None]:
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import torch
from torch.utils.data import Dataset, DataLoader
import torchaudio
import pandas as pd
from jiwer import wer
import numpy as np
from tqdm import trange, tqdm
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import random
from IPython.display import Audio
from torch_audiomentations import Compose, Gain, PitchShift, AddColoredNoise, PolarityInversion, PeakNormalization, HighPassFilter, LowPassFilter, Shift
import noisereduce as nr
import os 
from spellchecker import SpellChecker

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

ROOT = "/content/Datasets" # change this if needed - tmp location

In [None]:
probability = 0.5 
transform = Compose(
    transforms=[
        HighPassFilter(min_cutoff_freq=100, max_cutoff_freq=500 , p = probability, sample_rate=16000),
        LowPassFilter(min_cutoff_freq=2000, max_cutoff_freq=3000 , p = probability, sample_rate=16000),
        Gain(
            min_gain_in_db=-15.0,
            max_gain_in_db=+10.0,
            p=probability,
            output_type = "tensor"
        ),
        Shift(min_shift = -0.1, max_shift = 0.1, p = probability,  rollover = False),
        PitchShift(min_transpose_semitones = -4.0, max_transpose_semitones = 4.0, p = probability, sample_rate = 16000,output_type = "tensor"),
        AddColoredNoise(min_snr_in_db = 16, max_snr_in_db = 30, p = probability, output_type = "tensor"),
    ]
)
def audio_augmentation(audio):

    if len(audio.shape) < 3:
        audio = audio.reshape(1, 1, -1)
      
    audio = transform(audio, sample_rate = 16000).reshape(1, -1)
    audio = (audio - audio.mean()) /audio.std()
    return audio
# Collate function for DataLoader
def collate_fn_test(batch):

    input_features = [{"input_values": feature["input_values"]} for feature in batch]
    
    max_length = max([audio["input_values"].shape[0] for audio in input_features])
    
    batch = processor.pad(
        input_features,
        padding=True,
        max_length=max_length,
        return_tensors="pt",
    )

    return batch

class TILDataset(Dataset):
    def __init__(self, path, subset, augmentation = False):
        assert subset in ["train", "val", "test"]

        self.dataset = pd.read_pickle(path).reset_index(drop=True)
        self.subset = subset
        self.augmentation = augmentation
        if self.subset == "test":
            pass
        else:
            self.dataset = self.dataset.drop(columns = ['path', 'annotation'])
            
                
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        if self.subset == "test":
            audio_feature = self.dataset['input_values'][idx].to(device)
            return {"input_values": audio_feature.reshape(-1)}
        
        elif self.subset == "val":
            audio_feature = self.dataset['input_values'][idx].to(device)
            label = torch.tensor(self.dataset['labels'][idx]).to(device)

            return {"input_values": audio_feature.reshape(-1), 
                    "labels": label.to(device)} 

        elif self.subset == "train":
            audio = self.dataset['input_values'][idx].to(device)
            
            # Only augment on training subset
            audio = audio_augmentation(audio)

            label = torch.tensor(self.dataset['labels'][idx]).to(device)

            return {"input_values": audio.reshape(-1), 
                    "labels": label.to(device)}  

# **Saving Train and Validation Audio Tensors into a Pickle File**
For easy loading during pretraining

In [None]:
model_name = "facebook/wav2vec2-large-960h-lv60-self" 
processor = Wav2Vec2Processor.from_pretrained(model_name)

In [None]:
train_csv = pd.read_csv("/content/Advanced/ASR/Train.csv")
train_csv.head()

In [None]:
train_csv['labels'] = None
for i in trange(len(train_csv)):
    with processor.as_target_processor():
        train_csv["labels"][i] = torch.tensor(processor(train_csv['annotation'][i]).input_ids)

In [None]:
TRAIN_FOLDER = "/content/Datasets/Train"

train_csv['input_values'] = pd.Series()
for i in trange(len(train_csv)):
    input1, _ = torchaudio.load(TRAIN_FOLDER + f"/{train_csv['path'][i]}")
    input1 = input1.reshape(1, 1, -1)
    train_csv['input_values'][i] = processor(transform(input1), return_tensors="pt", padding='do_not_pad', sampling_rate = 16000).input_values.reshape(-1)

train_csv = train_csv.sample(frac=1).reset_index(drop=True)

train_csv[:3000].reset_index(drop=True).to_pickle(ROOT + "/Train_Set.pkl")
train_csv[3000:].reset_index(drop=True).to_pickle(ROOT + "/Val_Set.pkl")   

# **Finetuning a Pretrained ASR Encoder Models from Facebook**

In [None]:
model_name = "facebook/wav2vec2-large-960h-lv60-self" 
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC

processor = Wav2Vec2Processor.from_pretrained(model_name)
model = Wav2Vec2ForCTC.from_pretrained(
    model_name, 
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
).to(device)

In [None]:
### Train/Validation Audio processed by Wav2Vec2Processor was stored in a single pickle file so that it is easier to load.
train_ds = TILDataset(ROOT + "/Train_Set.pkl", subset = "train", augmentation = False)
val_ds = TILDataset(ROOT + "/Val_Set.pkl", subset = "val", augmentation = False)

print(train_ds.__len__(), val_ds.__len__())

In [None]:
from transformers import Wav2Vec2ForCTC, Trainer, TrainingArguments, EarlyStoppingCallback, TrainerCallback

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need
        # different padding methods
      
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

class MyCallback(TrainerCallback):
    "A callback that prints a message at the beginning of training"

    def on_epoch_end(self, args, state, control, **kwargs):
        torch.cuda.empty_cache()

In [None]:
from datasets import load_metric

data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)
wer_metric = load_metric("wer")

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

model = Wav2Vec2ForCTC.from_pretrained(
    model_name, 
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
).to(device)

model.freeze_feature_encoder()

training_args = TrainingArguments(
  output_dir=ROOT + f"/{model_name}/Checkpoint",
  group_by_length=False,
  per_device_train_batch_size=16, 
  evaluation_strategy="steps",
  save_strategy="steps",
  num_train_epochs=30,
  fp16=True,#change to True on CUDA
  gradient_checkpointing=True, 
  save_steps=1000,
  eval_steps=500,
  logging_steps=500,
  learning_rate=1e-4,
  weight_decay=0.005,
  warmup_steps=100,
  save_total_limit=1,
  load_best_model_at_end = True,
  optim = "adamw_torch", 
  dataloader_pin_memory = False
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_ds, 
    eval_dataset=val_ds,
    tokenizer=processor.feature_extractor,
    callbacks = [MyCallback]    
)

In [None]:
trainer.train()