# Import Libraries

In [None]:
import os
import torch
import json

import numpy as np
import pandas as pd
import torchaudio

from tqdm import tqdm
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, Trainer, TrainingArguments, AutoProcessor, Wav2Vec2Tokenizer, Wav2Vec2CTCTokenizer
from transformers import Wav2Vec2FeatureExtractor
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from datasets import load_dataset, Dataset, DatasetDict, Audio
from jiwer import wer

In [None]:
# Check if CUDA (GPU support) is available
print("CUDA Available:", torch.cuda.is_available())

# Print the name of the GPU
if torch.cuda.is_available():
    print("Device Name:", torch.cuda.get_device_name(0))
    print("Current Device:", torch.cuda.current_device())
    print("Total GPUs:", torch.cuda.device_count())
else:
    print("Running on CPU")
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Data Preparation

In [None]:
# Load CSV
df = pd.read_csv(r'C:\Users\Featherine\Downloads\HTX xData Assignment\common_voice\cv-valid-train.csv')

# Take first n row (testing)
df = df.iloc[:1000]

AUDIO_FOLDER = r'C:\Users\Featherine\Downloads\HTX xData Assignment\common_voice\cv-valid-train'

df['file'] = df['filename']
df['audio'] = df['file'].apply(lambda x: os.path.join(AUDIO_FOLDER, x))

# Create Hugging Face Dataset
dataset = Dataset.from_pandas(df)

dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

dataset = dataset.remove_columns([
    "filename", "accent", "age", "down_votes", "gender", "up_votes", "duration"])

dataset = dataset.train_test_split(test_size=0.3)
dataset

In [None]:
dataset['train'][0]

In [None]:
def extract_characters(batch):
  texts = " ".join(batch["text"])
  vocab = list(set(texts))
  return {"vocab": [vocab], "texts": [texts]}

vocabs = dataset.map(extract_characters, batched=True, batch_size=-1, 
                   keep_in_memory=True, remove_columns= dataset.column_names["train"])

vocab_list = list(set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

vocab_dict["[UNK]"] = len(vocab_dict) # add "unknown" token 
vocab_dict["[PAD]"] = len(vocab_dict) # add a padding token that corresponds to CTC's "blank token"

with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

# Preprocessing

In [None]:
model_name = "facebook/wav2vec2-base-960h"

# create Wav2Vec2 tokenizer
tokenizer = Wav2Vec2CTCTokenizer("vocab.json", unk_token="[UNK]",
                                  pad_token="[PAD]", word_delimiter_token="|")

# create Wav2Vec2 feature extractor
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, 
                                             padding_value=0.0, do_normalize=True, return_attention_mask=False)
# create a processor pipeline 
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)
# processor = Wav2Vec2Processor.from_pretrained(model_name)

# extract the numerical representation from the dataset
def extract_array_samplingrate(batch):
    batch["speech"] = batch['audio']['array'].tolist()
    batch["sampling_rate"] = batch['audio']['sampling_rate']
    batch["target_text"] = batch["text"]
    return batch

dataset = dataset.map(extract_array_samplingrate, 
                      remove_columns=dataset.column_names["train"])

# process the dataset with processor pipeline that created above
def process_dataset(batch):  
    batch["input_values"] = processor(batch["speech"], 
                            sampling_rate=batch["sampling_rate"][0]).input_values

    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
    return batch

data_processed = dataset.map(process_dataset, 
                    remove_columns=dataset.column_names["train"], batch_size=8, 
                    batched=True)

In [None]:
train_dataset = data_processed['train']
test_dataset = data_processed['test']

In [None]:
train_dataset[110]

In [None]:
text = processor.tokenizer.decode(train_dataset['labels'][4], skip_special_tokens=True)
print(text)

# Training

In [None]:
model = Wav2Vec2ForCTC.from_pretrained(model_name)
# Update model configuration for new vocabulary size
model.config.pad_token_id = tokenizer.pad_token_id
model.config.vocab_size = len(tokenizer)

In [None]:
training_args = TrainingArguments(
  output_dir="./wav2vec2-base-960h-cv",
  group_by_length=True,
  # per_device_train_batch_size=16,
  per_device_train_batch_size=1,
  evaluation_strategy="steps",
  num_train_epochs=3,
  fp16=True,
  save_steps=500,
  eval_steps=500,
  logging_steps=500,
  learning_rate=1e-4,
  warmup_steps=500,
  save_total_limit=2,
)

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred_str = processor.batch_decode(pred_ids)
    label_ids = pred.label_ids
    # Replace -100 in labels as the pad token id to decode properly
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, group_tokens=False)

    wer_score = wer(label_str, pred_str)
    return {"wer": wer_score}

In [None]:
@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

trainer = Trainer(
    model=model,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=processor.feature_extractor,
    data_collator=data_collator,
)

In [None]:
trainer.train()
trainer.save_model("./wav2vec2-base-960h-cv")

In [None]:
metrics = trainer.evaluate()
print(f"WER: {metrics['eval_wer']}")

# Evaluate on Test Set

In [None]:
SAVE_PATH = r'C:\Users\Featherine\Downloads\HTX xData Assignment\asr-train\cv-valid-dev-saved.csv'
AUDIO_FOLDER = r'C:\Users\Featherine\Downloads\HTX xData Assignment\common_voice\cv-valid-dev'

# Load the processor
# feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=False)
# tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")
# processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)


model_name = "./wav2vec2-base-960h-cv"
tokenizer = Wav2Vec2Tokenizer.from_pretrained(model_name)
model = Wav2Vec2ForCTC.from_pretrained(model_name)
model.eval()

# Load test CSV
test_df = pd.read_csv(r'C:\Users\Featherine\Downloads\HTX xData Assignment\common_voice\cv-valid-dev.csv')

# Transcribe each audio file
transcriptions = []
for path in tqdm(test_df['filename']):
    audio_path = os.path.join(AUDIO_FOLDER, path)
    
    waveform, sample_rate = torchaudio.load(audio_path)
    # Resample if necessary
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)
    input_values = tokenizer(waveform.squeeze().numpy(), return_tensors="pt").input_values

    # Perform inference
    logits = model(input_values).logits
    predicted_ids = logits.argmax(dim=-1)
    transcription = tokenizer.decode(predicted_ids[0])
    # print(transcription)
    transcriptions.append(transcription)

# Add transcriptions to DataFrame
test_df['generated_text'] = transcriptions

test_df.to_csv(SAVE_PATH, index=False)
print(f"Transcription complete. Updated CSV saved to {SAVE_PATH}.")

In [None]:
input_values

# Get results

In [22]:
import pandas as pd
from jiwer import wer

SAVE_PATH = r'C:\Users\Featherine\Downloads\HTX xData Assignment\asr-train\cv-valid-dev-saved.csv'
test_df = pd.read_csv(SAVE_PATH)

# Calculate WER
text = [str(x) for x in test_df['text'].tolist()]
text_gen = [str(x).lower() for x in test_df['generated_text'].tolist()]
wer_score = wer(text, text_gen)
print(f"Test WER: {wer_score}")

Test WER: 0.1081200813050503
