# Data Preparation

In [1]:
# Set Seeds for Reproducibility
import random
import numpy as np
import torch
import os

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

In [2]:
pip install datasets



In [3]:
# import libraries
from datasets import load_dataset, Dataset
import datasets
import json
import re

In [4]:
# Load the Dataset and Save to Disk
train_streaming_ds = load_dataset(
    "mozilla-foundation/common_voice_11_0", "en",
    split="train",
    streaming=True,
    trust_remote_code=True
)
train_streaming_ds = train_streaming_ds.cast_column("audio", datasets.Audio(sampling_rate=16_000))

test_streaming_ds = load_dataset(
    "mozilla-foundation/common_voice_11_0", "en",
    split="test",
    streaming=True,
    trust_remote_code=True
)
test_streaming_ds = test_streaming_ds.cast_column("audio", datasets.Audio(sampling_rate=16_000))

def take_n_samples(streaming_dataset, n):
    """Extract n samples from a streaming dataset."""
    samples = []
    for i, sample in enumerate(streaming_dataset):
        samples.append(sample)
        if i >= n - 1:
            break
    return samples

train_samples = take_n_samples(train_streaming_ds, 1500)
test_samples = take_n_samples(test_streaming_ds, 300)

# Convert streaming samples to regular Datasets for further processing
train_ds = Dataset.from_list(train_samples)
test_ds   = Dataset.from_list(test_samples)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Reading metadata...: 948736it [00:26, 35279.78it/s]
Reading metadata...: 16354it [00:00, 37927.62it/s]


In [5]:
# Save datasets to disk for persistence
os.makedirs("data", exist_ok=True)
train_ds.save_to_disk("/content/drive/MyDrive/data/train_ds")
test_ds.save_to_disk("/content/drive/MyDrive/data/test_ds")
print("Datasets saved to disk in the 'data' directory.")

Saving the dataset (0/3 shards):   0%|          | 0/1500 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/300 [00:00<?, ? examples/s]

Datasets saved to disk in the 'data' directory.


In [6]:
# Load datasets back from disk (simulate working with imported data)
train_ds = datasets.load_from_disk("/content/drive/MyDrive/data/train_ds")
test_ds   = datasets.load_from_disk("/content/drive/MyDrive/data/test_ds")
print("Datasets re-loaded from disk.")

Datasets re-loaded from disk.


In [7]:
# PREPROCESSING: CLEANING TEXT AND BUILDING VOCABULARY

# Remove Unnecessary Columns
columns_to_exclude = [
    "client_id", "path", "age", "gender", "accent", "locale", "segment"
]

train_ds = train_ds.remove_columns(columns_to_exclude)
test_ds = test_ds.remove_columns(columns_to_exclude)

# Define Special Characters Pattern
special_chars_pattern = r'[\,\?\.\!\-\;\:\"\“\%\‘\”\’]'

def clean_transcript(batch):

    batch["text"] = re.sub(special_chars_pattern, '', batch["sentence"]).lower().strip()
    return batch

# Apply the clean_transcript function to both datasets
train_ds = train_ds.map(clean_transcript)
test_ds = test_ds.map(clean_transcript)

# Aggregate Unique Characters
def extract_all_chars(batch):

    combined_text = " ".join(batch["text"])
    unique_chars = list(set(combined_text))
    # Repeat the vocabulary and combined text for each sample in the batch
    return {"vocab": [unique_chars] * len(batch["text"]), "all_text": [combined_text] * len(batch["text"])}

# Compute vocabulary information by applying aggregate_unique_chars in batched mode
vocab_data = train_ds.map(extract_all_chars, batched=True, batch_size=-1)

# Build the Vocabulary Dictionary
vocab_list = list(set(vocab_data["vocab"][0]))

# Create a mapping from each character to a unique integer index
vocab_dict = {char: idx for idx, char in enumerate(vocab_list)}

# Map the space character ' ' to the CTC blank token represented as '|'
vocab_dict["|"] = vocab_dict[" "]
# Remove the original mapping for the space character
del vocab_dict[" "]

# Add special tokens for unknown and padding
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)

# Save the vocabulary dictionary as a JSON file to the specified path
with open('/content/drive/MyDrive/vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

print("Vocabulary dictionary has been successfully saved.")


Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

Map:   0%|          | 0/300 [00:00<?, ? examples/s]

Map:   0%|          | 0/1500 [00:00<?, ? examples/s]

Vocabulary dictionary has been successfully saved.


In [8]:
from transformers import (
    Wav2Vec2CTCTokenizer,
    Wav2Vec2Processor,
    Wav2Vec2FeatureExtractor
)

In [9]:
#Initialize the tokenizer using the custom vocabulary
tokenizer = Wav2Vec2CTCTokenizer(
    '/content/drive/MyDrive/vocab.json',
    unk_token="[UNK]",
    pad_token="[PAD]",
    word_delimiter_token="|"
)

# Initialize the feature extractor for raw audio
feature_extractor = Wav2Vec2FeatureExtractor(
    feature_size=1,
    sampling_rate=16_000,
    padding_value=0.0,
    do_normalize=True,
    return_attention_mask=True
)

# Combine tokenizer and feature extractor into a processor
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)


In [10]:
def process_sample(batch):
    # Process the audio data
    # Extract the raw audio array and define the sampling rate
    audio_array = batch["audio"]["array"]
    sampling_rate = 16_000  # Desired sampling rate
    # Process the audio using the processor (returns PyTorch tensors)
    processed_audio = processor(audio_array, sampling_rate=sampling_rate, return_tensors="pt")
    # Assign the processed audio to 'input_values'
    batch["input_values"] = processed_audio.input_values[0]

    with processor.as_target_processor():
        batch["labels"] = processor(batch["text"]).input_ids
    return batch

train_ds = train_ds.map(process_sample)
test_ds = test_ds.map(process_sample)


# Keep only the columns required for training: "input_values" and "labels"
cols_to_keep = ["input_values", "labels"]
cols_to_remove = [col for col in train_ds.column_names if col not in cols_to_keep]
train_ds = train_ds.remove_columns(cols_to_remove)
test_ds = test_ds.remove_columns(cols_to_remove)

# -------------------------
# Set Dataset Format to PyTorch Tensors
# -------------------------
train_ds.set_format("torch")
test_ds.set_format("torch")



Map:   0%|          | 0/1500 [00:00<?, ? examples/s]



Map:   0%|          | 0/300 [00:00<?, ? examples/s]

# Model Configuration

In [11]:
from transformers import (
    Wav2Vec2ForCTC,
    TrainingArguments,
    Trainer
)

In [12]:
def data_collator(features):
    input_values = [{"input_values": feature["input_values"]} for feature in features]
    labels = [feature["labels"] for feature in features]

    # Pad input_values separately
    input_values_batch = processor.feature_extractor.pad(
        input_values,
        padding=True,
        return_tensors="pt"
    )

    # Pad labels separately
    labels_batch = processor.tokenizer.pad(
        {"input_ids": labels},  # Wrap labels in a dictionary with "input_ids" key
        padding=True,
        return_tensors="pt"
    )

    # Combine padded inputs and labels
    batch = {
        "input_values": input_values_batch["input_values"],
        "labels": labels_batch["input_ids"],
        "attention_mask": input_values_batch["attention_mask"]

    }

    batch["labels"][batch["labels"] == processor.tokenizer.pad_token_id] = -100
    return batch

In [13]:
model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-base-960h",
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id,
)

training_args = TrainingArguments(
    output_dir="./wav2vec2-finetuned",
    eval_strategy="epoch",
    logging_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    fp16=True,
    gradient_checkpointing=True,
    save_steps=200,
    eval_steps=200,
    logging_steps=200,
    learning_rate=5e-4,
    report_to=[],
)




Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Model Training and Evaluation

In [14]:
pip install evaluate jiwer



In [15]:
import evaluate

In [16]:
# Define the Evaluation Metric Function
wer_metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)
    pred_str = processor.batch_decode(pred_ids)

    label_ids = pred.label_ids
    # Replace -100 with pad token id before decoding
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}




train_val_split = train_ds.train_test_split(test_size=0.1, seed=42)
train_ds_for_training = train_val_split["train"]   # 90% used for training
train_ds_for_val = train_val_split["test"]         # 10% used for evaluation during training


trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_ds_for_training,
    eval_dataset=train_ds_for_val,
    tokenizer=processor.feature_extractor,
)

# Train the model
trainer.train()





  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Wer
1,6.5373,5.558956,0.977926
2,6.1406,5.629985,0.977926
3,5.9272,5.478723,0.977926
4,5.7677,5.580047,0.977926
5,5.7514,5.542346,0.977926


TrainOutput(global_step=845, training_loss=6.024850048539201, metrics={'train_runtime': 469.2331, 'train_samples_per_second': 14.385, 'train_steps_per_second': 1.801, 'total_flos': 5.3178765269151744e+17, 'train_loss': 6.024850048539201, 'epoch': 5.0})

In [19]:
# Final Evaluation on the Held-Out val_ds
##########################################

# After training, evaluate on the held-out final validation dataset
final_results = trainer.evaluate(eval_dataset=test_ds)
print("Final evaluation on 300 Test samples: WER = ", final_results['eval_wer'])

Final evaluation on 300 Test samples: WER =  0.9804964539007093
