## Environment Set Up

In [None]:
# Install necessary libraries
!pip install git+https://github.com/openai/whisper.git
!pip install jiwer datasets transformers evaluate torchaudio  accelerate
!pip install torch --upgrade --extra-index-url https://download.pytorch.org/whl/cu118

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Standard library imports
import os
import zipfile
import requests
import io
import re
import locale

# Data handling and processing
import pandas as pd
import numpy as np

# Audio processing
import torchaudio
import torchaudio.transforms as T

# Transformers library
from transformers import (
    WhisperForConditionalGeneration,
    WhisperProcessor,
    WhisperTokenizer,
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer,
    EarlyStoppingCallback,
    TrainingArguments
)

# Datasets and utilities
from datasets import Dataset, Audio
from torch.utils.data import Dataset as TorchDataset, DataLoader
from tqdm.notebook import tqdm

# Evaluation and metrics
import evaluate
from evaluate import load
import jiwer
from jiwer import wer
from whisper.normalizers import EnglishTextNormalizer


# IPython display
from IPython.display import Audio, display, HTML

# PyTorch
import torch
import torch.nested

# Initialize processor
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

## Class and Function Definitions

In [None]:
class AsantiTwiDataset(TorchDataset):
    def __init__(self, zip_url, csv_filename, audio_base_path, processor=None, val_ratio=None):
        self.processor = processor or WhisperProcessor.from_pretrained("openai/whisper-small")

        # Download and extract the dataset
        response = requests.get(zip_url, stream=True)
        response.raise_for_status()
        with zipfile.ZipFile(io.BytesIO(response.content), 'r') as zip_ref:
            zip_ref.extractall('.')

        # Clean the CSV file
        cleaned_csv_filename = f"cleaned_{os.path.basename(csv_filename)}"
        self._clean_csv(csv_filename, cleaned_csv_filename)

        # Load the cleaned CSV
        self.df = pd.read_csv(cleaned_csv_filename)
        self.test_df = df = pd.read_csv(cleaned_csv_filename)

        # Rename columns
        column_mapping = {
            "Audio Filepath": "path",
            "Transcription": "sentence",
        }
        self.df.rename(columns=lambda x: column_mapping.get(x.strip(), x.strip()), inplace=True)

        # Split into train and validation if val_ratio is provided
        if isinstance(val_ratio, float) and 0 < val_ratio < 1:
            val_size = int(len(self.df) * val_ratio)
            self.df = self.df.sample(frac=1, random_state=42)  # Shuffle
            self.val_df = self.df.head(val_size)
            self.train_df = self.df.tail(len(self.df) - val_size)
        else:
            self.train_df = self.df
            self.val_df = None

        self.audio_base_path = audio_base_path

    def _clean_csv(self, input_path, output_path):
        with open(input_path, "r") as infile:
            lines = infile.readlines()

        # Replace tabs with commas and clean paths
        clean_lines = [
            line.replace("\t", ",")
                .replace("lacuna-audios-train/asanti-twi/audios/","")
                .replace("lacuna-audios-test/asanti-twi/audios/","")
                .replace("ɛberɛ nni hɔ/  mmerɛ nni hɔ","Nnipa yɛ bad")
                .replace("Nnipa yɛ bad (informal) / Nnipa nnyɛ (formal)","ɛberɛ nni hɔ")
                .replace("A SLRKMb","ASLRKMb")
                .replace("Mepaa’kyɛw (spoken. Written: Mepawokyew)","Mepaa’kyɛw")
            for line in lines
        ]

        # Filter rows with the correct number of fields
        expected_fields = clean_lines[0].count(",") + 1
        valid_lines = [line for line in clean_lines if line.count(",") + 1 == expected_fields]

        # Write cleaned content to a new file
        with open(output_path, "w") as outfile:
            outfile.writelines(valid_lines)

    def __len__(self):
        return len(self.train_df)

    def play_audio(self, idx):
        """Plays the audio at the given index."""
        row = self.test_df.iloc[idx]
        audio_path = os.path.join(self.audio_base_path, row['path'])

        # Load the audio file
        waveform, sample_rate = torchaudio.load(audio_path)

        # Display the audio using IPython.display.Audio
        display(Audio(waveform.numpy(), rate=sample_rate))

    def __getitem__(self, idx):
        row = self.train_df.iloc[idx]
        audio_path = os.path.join(self.audio_base_path, row['path'])

        # Load audio
        try:
            waveform, sr = torchaudio.load(audio_path, format="ogg")
        except Exception as e:
            # print(f"Error loading {audio_path}: ")
            # Recursively try next item
            return self.__getitem__((idx + 1) % len(self))

        # Resample if needed
        if sr != 16000:
            resampler = T.Resample(orig_freq=sr, new_freq=16000)
            waveform = resampler(waveform)

        waveform = waveform.squeeze(0)  # Remove channel dimension

        # Process audio
        input_features = self.processor(
            waveform,
            sampling_rate=16000,
            return_tensors="pt"
        ).input_features[0]

        # Prepare labels
        text = row['sentence']
        labels = self.processor.tokenizer(
            text,
            return_tensors="pt"
        ).input_ids[0]

        return input_features, labels

In [None]:
def prepare_model_and_dataset():
    # Load Whisper model and processor
    processor = WhisperProcessor.from_pretrained("openai/whisper-tiny", language="en", task="transcribe")
    model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny").to("cuda" if torch.cuda.is_available() else "cpu")

    # Dataset URLs
    train_zip_url = "https://fisd-dataset.s3.amazonaws.com/fisd-asanti-twi-90p.zip"
    train_csv_filename = "fisd-asanti-twi-90p/data.csv"
    train_audio_base_path = "fisd-asanti-twi-90p/audios"

    test_zip_url = "https://fisd-dataset.s3.amazonaws.com/fisd-asanti-twi-10p.zip"
    test_csv_filename = "fisd-asanti-twi-10p/data.csv"
    test_audio_base_path = "fisd-asanti-twi-10p/audios"

    # Create datasets
    train_dataset = AsantiTwiDataset(
        zip_url=train_zip_url,
        csv_filename=train_csv_filename,
        audio_base_path=train_audio_base_path,
        processor=processor,
        val_ratio=0.3
    )

    test_dataset = AsantiTwiDataset(
        zip_url=test_zip_url,
        csv_filename=test_csv_filename,
        audio_base_path=test_audio_base_path,
        processor=processor
    )
    return model, processor, train_dataset, test_dataset



In [None]:
def prepare_training_args(output_dir='./whisper-asanti-twi2'):
    training_args = Seq2SeqTrainingArguments(
        output_dir=output_dir,
        num_train_epochs=10,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=4,

        # Learning rate scheduling and regularization
        learning_rate=1e-4,  # Small learning rate to prevent rapid overfitting
        weight_decay=0.01,   # L2 regularization to penalize large weights
        lr_scheduler_type="linear",  # Gradual learning rate reduction
        warmup_steps=500,    # Gradual learning rate increase initially

        # Model saving and evaluation
        load_best_model_at_end=True,
        metric_for_best_model="wer",  # Use Word Error Rate for model selection

        gradient_accumulation_steps=2,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        predict_with_generate=True,
        generation_max_length=225,
        fp16=True,  # Mixed precision training to help generalization
        report_to=[]
    )
    return training_args

In [None]:
def compute_metrics(pred):
    # pred contains predictions and label_ids
    decoded_preds = processor.batch_decode(pred.predictions, skip_special_tokens=True)
    decoded_labels = processor.batch_decode(pred.label_ids, skip_special_tokens=True)

    # Normalize text
    normalizer = EnglishTextNormalizer()
    decoded_preds_clean = [normalizer(text) for text in decoded_preds]
    decoded_labels_clean = [normalizer(text) for text in decoded_labels]

    # Compute WER
    wer = jiwer.wer(decoded_labels_clean, decoded_preds_clean)

    # Compute CER
    cer = jiwer.cer(decoded_labels_clean, decoded_preds_clean)

    # Print the results (optional)
    print(f"WER: {wer * 100:.2f} %")
    print(f"CER: {cer * 100:.2f} %")

    return {"wer": wer, "cer": cer}

In [None]:
def data_collator(batch):
    required_length = 3000  # Fixed length required by Whisper

    # Pad or truncate input features
    input_features = torch.stack([
        torch.nn.functional.pad(
            item["input_features"],
            (0, required_length - item["input_features"].shape[-1]),  # Pad to the right
            mode="constant",
            value=0  # Padding value
        )[:, :required_length]  # Truncate if longer than 3000
        for item in batch
    ])

    # Pad labels to the maximum sequence length in the batch
    labels = torch.nn.utils.rnn.pad_sequence(
        [item["labels"] for item in batch],
        batch_first=True,
        padding_value=processor.tokenizer.pad_token_id
    )
    labels[labels == processor.tokenizer.pad_token_id] = -100  # Mask padding tokens for loss calculation

    return {
        "input_features": input_features,  # Padded to required length
        "labels": labels
    }


## Train Function

In [None]:
def train_whisper_model():
    # Using a smaller Whisper model to reduce complexity
    model, processor, train_dataset, test_dataset = prepare_model_and_dataset()

    # Freeze initial layers of the model
    for param in model.base_model.parameters():
        param.requires_grad = False  # Freeze initial layers

    # Unfreeze last few layers for fine-tuning
    for param in model.base_model.encoder.layers[-2:].parameters():
        param.requires_grad = True

    # Rest of the training function remains the same
    training_args = prepare_training_args()

    # Custom data collator for batching
    def collate_fn(batch):
         # Filter out None values
        batch = [b for b in batch if b is not None]
        if len(batch) == 0:
            # Handle the case where all items in the batch are None
            return {}  # or return some default value

        input_features = torch.stack([x[0] for x in batch])
        labels = torch.nn.utils.rnn.pad_sequence(
            [x[1] for x in batch],
            batch_first=True,
            padding_value=processor.tokenizer.pad_token_id
        )
        return {
            "input_features": input_features,
            "labels": labels
        }

    trainer = Seq2SeqTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        data_collator=collate_fn,
        compute_metrics=compute_metrics,
        processing_class=processor
    )

    # Add early stopping callback
    trainer.add_callback(EarlyStoppingCallback(early_stopping_patience=3))

    # Train
    trainer.train()

    # Save model
    trainer.save_model('./whisper-asanti-twi-model')

    return model, processor

In [None]:
# Execute training
trained_model, trained_processor = train_whisper_model()
print("Model training completed!")

# Save the model and tokenizer to your Google Drive
model_save_path = '/content/drive/MyDrive/trained_whisper_model'  # Path for model
tokenizer_save_path = '/content/drive/MyDrive/trained_whisper_tokenizer'  # Path for tokenizer

trained_model.save_pretrained(model_save_path)  # Save model
trained_processor.save_pretrained(tokenizer_save_path)  # Save tokenizer

print(f"Model saved to: {model_save_path}")
print(f"Tokenizer saved to: {tokenizer_save_path}")

## Evaluation

In [None]:
# Save the model and tokenizer to your Google Drive
model_save_path = '/content/drive/MyDrive/trained_whisper_model'  # Path for model
tokenizer_save_path = '/content/drive/MyDrive/trained_whisper_tokenizer'  # Path for tokenizer

# Load the model and tokenizer
model = WhisperForConditionalGeneration.from_pretrained(model_save_path)
processor = WhisperProcessor.from_pretrained(tokenizer_save_path)


In [None]:
zip_url = "https://fisd-dataset.s3.amazonaws.com/fisd-asanti-twi-10p.zip"
csv_filename = "fisd-asanti-twi-10p/data.csv"  # Assuming your CSV file is named 'metadata.csv'
audio_base_path = "fisd-asanti-twi-10p/audios"  # Assuming your audio files are in 'audio_files' folder within the zip


test_dataset = AsantiTwiDataset(zip_url=zip_url, csv_filename=csv_filename, audio_base_path=audio_base_path,processor=processor)

In [None]:
test_dataset.play_audio(11)
# Create a DataLoader for batch processing
loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=32,
    collate_fn=data_collator
)

In [None]:
hypotheses = []
references = []

# Iterate through the dataset individually
for i in tqdm(range(len(test_dataset))):
    input_features, labels = test_dataset[i]  # single data point

    # Prepare the input for the model
    mels = input_features.unsqueeze(0).to(DEVICE)  # Add batch dimension and move to device

    model.to(DEVICE)

    # Generate predictions
    results = model.generate(mels)

    # Decode and store results
    hypotheses.extend(processor.batch_decode(results, skip_special_tokens=True))
    references.extend(processor.batch_decode(labels.unsqueeze(0), skip_special_tokens=True))

data = pd.DataFrame(dict(hypothesis=hypotheses, reference=references))
data

In [None]:
# Normalize text
normalizer = EnglishTextNormalizer()
data["hypothesis_clean"] = [normalizer(text) for text in data["hypothesis"]]
data["reference_clean"] = [normalizer(text) for text in data["reference"]]


#=========== Method 1 ===================
print ("============ Method 1 ====================")
# Compute WER
wer = jiwer.wer(list(data["reference_clean"]), list(data["hypothesis_clean"]))

# Compute CER
cer = jiwer.cer(list(data["reference_clean"]), list(data["hypothesis_clean"]))

# Print the results
print(f"WER: {wer * 100:.2f} %")
print(f"CER: {cer * 100:.2f} %")

#============ Method 2 ====================
print ("============ Method 2 ====================")
# Compute individual WER and CER
data["wer"] = data.apply(lambda row: jiwer.wer(row["reference_clean"], row["hypothesis_clean"]), axis=1)
data["cer"] = data.apply(lambda row: jiwer.cer(row["reference_clean"], row["hypothesis_clean"]), axis=1)

# Compute overall WER and CER
overall_wer = data["wer"].mean()
overall_cer = data["cer"].mean()

# Print the results
print(f"Overall WER: {overall_wer * 100:.2f} %")
print(f"Overall CER: {overall_cer * 100:.2f} %")

# Display the DataFrame with individual and overall scores
display(data)

In [None]:
import matplotlib.pyplot as plt


# Before removing outliers
overall_wer_before = data["wer"].mean()
overall_cer_before = data["cer"].mean()

# Remove outliers with WER or CER above 0.3 (30%) or 0.5 (50%)
filtered_data = data[(data["wer"] <= 0.5) & (data["cer"] <= 0.3)]

# After removing outliers
overall_wer_after = filtered_data["wer"].mean()
overall_cer_after = filtered_data["cer"].mean()

# Print the results
print(f"Overall WER (Before outlier removal): {overall_wer_before * 100:.2f} %")
print(f"Overall CER (Before outlier removal): {overall_cer_before * 100:.2f} %")
print(f"Overall WER (After outlier removal): {overall_wer_after * 100:.2f} %")
print(f"Overall CER (After outlier removal): {overall_cer_after * 100:.2f} %")

# Display the DataFrame with individual and overall scores
display(filtered_data)


# Plot WER and CER for each data point along with the means (Before outlier removal)
plt.figure(figsize=(12, 6))
plt.plot(data["wer"], label="WER", marker="o", linestyle="-")
plt.plot(data["cer"], label="CER", marker="x", linestyle="--")
plt.axhline(overall_wer_before, color="r", linestyle="--", label="Mean WER")
plt.axhline(overall_cer_before, color="b", linestyle="--", label="Mean CER")
plt.title("WER and CER Distribution (Before Outlier Removal)")
plt.xlabel("Data Point Index")
plt.ylabel("Error Rate")
plt.legend()
plt.grid(True)  # Add grid for better visualization

# Plot WER and CER for each data point along with the means (After outlier removal)
plt.figure(figsize=(12, 6))
plt.plot(filtered_data["wer"], label="WER", marker="o", linestyle="-")
plt.plot(filtered_data["cer"], label="CER", marker="x", linestyle="--")
plt.axhline(overall_wer_after, color="r", linestyle="--", label="Mean WER")
plt.axhline(overall_cer_after, color="b", linestyle="--", label="Mean CER")
plt.title("WER and CER Distribution (After Outlier Removal)")
plt.xlabel("Data Point Index")
plt.ylabel("Error Rate")
plt.legend()
plt.grid(True)  # Add grid for better visualization

plt.tight_layout()
plt.show()



##Evaluation on Custom Dataset

In [None]:
class AsantiTwiDataset(torch.utils.data.Dataset):
    def __init__(self, tsv_path, audio_base_path, processor=None, device="cpu"):
        self.device = device
        self.audio_base_path = audio_base_path
        self.processor = processor

        # Load the TSV file
        self.df = pd.read_csv(tsv_path, sep='\t')

        # Map columns if needed
        column_mapping = {
            "Audio Filepath": "path",
            "Transcription": "sentence",
        }
        self.df.rename(columns=lambda x: column_mapping.get(x.strip(), x.strip()), inplace=True)
        self.df['path'] = self.df['path'].str.replace("lacuna-audios-test/asanti-twi/audios/", "")
        # Verify required columns
        if 'path' not in self.df.columns or 'sentence' not in self.df.columns:
            raise ValueError("TSV file must contain 'path' and 'sentence' columns."
                             f"\nAvailable columns: {self.df.columns.tolist()}") # Print available columns for debugging

    def __len__(self):
        return len(self.df)

    def play_audio(self, idx):
        """Plays the audio associated with the given index."""
        audio_path = os.path.join(self.audio_base_path, self.df.iloc[idx]['path'])

        # Check if the file exists and is accessible
        if not os.path.exists(audio_path):
            print(f"Warning: Audio file not found: {audio_path}. Skipping...")
            return  # Exit the function if file not found

        try:
            # Load audio using torchaudio, automatically detecting format
            waveform, sr = torchaudio.load(audio_path)
        except RuntimeError as e:
            print(f"Warning: Error loading audio file {audio_path}: {e}. Skipping...")
            return  # Exit the function if loading fails

        # Display audio using IPython.display.Audio
        display(Audio(waveform.numpy(), rate=sr))

    def __getitem__(self, idx):
        audio_path = os.path.join(self.audio_base_path, self.df.iloc[idx]['path'])

        # Check if the file exists and is accessible
        if not os.path.exists(audio_path):
            print(f"Warning: Audio file not found: {audio_path}. Skipping...")
            # Skip this file and move to the next one
            idx = (idx + 1) % len(self.df)  # Wrap around to the beginning if at the end
            return self.__getitem__(idx)  # Recursive call to get the next item

        try:
            # Load audio using torchaudio, automatically detecting format
            waveform, sr = torchaudio.load(audio_path)
        except RuntimeError as e:
            print(f"Warning: Error loading audio file {audio_path}: Runtime Error. Skipping...")
            # Skip this file and move to the next one
            idx = (idx + 1) % len(self.df)  # Wrap around to the beginning if at the end
            return self.__getitem__(idx)  # Recursive call to get the next item

        # Resample if the sampling rate is not 16 kHz
        if sr != 16000:
            resampler = T.Resample(orig_freq=sr, new_freq=16000)
            waveform = resampler(waveform)

        waveform = waveform.squeeze(0)  # Remove channel dimension

         # Process audio and text
        inputs = self.processor(
            waveform, sampling_rate=16000, return_tensors="pt", padding=True
        )
        text = self.df.iloc[idx]['sentence']
        labels = self.processor.tokenizer(
            text, return_tensors="pt", padding=True
        ).input_ids

        # Correctly return the input features (as a PyTorch tensor) and labels
        return inputs.input_features[0], labels[0]

In [None]:
# Save the model and tokenizer to your Google Drive
model_save_path = '/content/drive/MyDrive/trained_whisper_model'  # Path for model
tokenizer_save_path = '/content/drive/MyDrive/trained_whisper_tokenizer'  # Path for tokenizer

# Load the model and tokenizer
model = WhisperForConditionalGeneration.from_pretrained(model_save_path)
processor = WhisperProcessor.from_pretrained(tokenizer_save_path)


In [None]:
# Paths to data on Google Drive
#tsv_path = '/content/drive/MyDrive/mixed-audios/mixed_dataset_labels.tsv'
tsv_path = '/content/drive/MyDrive/mixed-audios/mixed_dataset_labels_30.tsv'

audio_base_path = '/content/drive/MyDrive/mixed-audios'

# Create the dataset instance
test_dataset = AsantiTwiDataset(tsv_path=tsv_path, audio_base_path=audio_base_path, processor=processor)

In [None]:
test_dataset.df

In [None]:
test_dataset.play_audio(11) #playing a random example
# DataLoader for batch processing
loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=7,
    collate_fn=data_collator
)

In [None]:
hypotheses = []
references = []
required_length = 3000

# Iterate through the dataset individually
for i in tqdm(range(len(test_dataset))):
    input_features, labels = test_dataset[i]  # Get a single data point

    # Pad or truncate input features
    input_features = torch.nn.functional.pad(
        input_features,
        (0, required_length - input_features.shape[-1]),  # Pad to the right
        mode="constant",
        value=0  # Padding value
    )[:, :required_length]  # Truncate if longer than 3000


    # Prepare the input for the model
    mels = input_features.unsqueeze(0).to(DEVICE)  # Add batch dimension and move to device

    model.to(DEVICE)

    # Generate predictions
    results = model.generate(mels)

    # Decode and store results
    hypotheses.extend(processor.batch_decode(results, skip_special_tokens=True))
    references.extend(processor.batch_decode(labels.unsqueeze(0), skip_special_tokens=True))

data = pd.DataFrame(dict(hypothesis=hypotheses, reference=references))
data

In [None]:
# Normalize text
normalizer = EnglishTextNormalizer()
data["hypothesis_clean"] = [normalizer(text) for text in data["hypothesis"]]
data["reference_clean"] = [normalizer(text) for text in data["reference"]]


#=========== Method 1 ===================
print ("============ Method 1 ====================")
# Compute WER
wer = jiwer.wer(list(data["reference_clean"]), list(data["hypothesis_clean"]))

# Compute CER
cer = jiwer.cer(list(data["reference_clean"]), list(data["hypothesis_clean"]))

# Print the results
print(f"WER: {wer * 100:.2f} %")
print(f"CER: {cer * 100:.2f} %")

#============ Method 2 ====================
print ("============ Method 2 ====================")
# Compute individual WER and CER
data["wer"] = data.apply(lambda row: jiwer.wer(row["reference_clean"], row["hypothesis_clean"]), axis=1)
data["cer"] = data.apply(lambda row: jiwer.cer(row["reference_clean"], row["hypothesis_clean"]), axis=1)

# Compute overall WER and CER
overall_wer = data["wer"].mean()
overall_cer = data["cer"].mean()

# Print the results
print(f"Overall WER: {overall_wer * 100:.2f} %")
print(f"Overall CER: {overall_cer * 100:.2f} %")

# Display the DataFrame with individual and overall scores
display(data)

## Uploading model to hugging face

In [None]:
!pip install huggingface_hub==0.16.4

In [None]:
from huggingface_hub import notebook_login, HfApi
from huggingface_hub.repocard import metadata_eval_result, metadata_save

In [None]:
notebook_login() # token - 

In [None]:
api = HfApi()
repo_id = "CiBeDL/twi_trained_whisper"
api.create_repo(repo_id=repo_id, exist_ok=True)  # repository name

In [None]:
# Paths where the model and tokenizer were saved
model_save_path = '/content/drive/MyDrive/trained_whisper_model'
tokenizer_save_path = '/content/drive/MyDrive/trained_whisper_tokenizer'

# Load the model and tokenizer
model = WhisperForConditionalGeneration.from_pretrained(model_save_path)
processor = WhisperProcessor.from_pretrained(tokenizer_save_path)

model.save_pretrained(repo_id)
processor.save_pretrained(repo_id)

In [None]:
api.upload_folder(
    folder_path="/content/drive/MyDrive/trained_whisper_model",
    path_in_repo="",
    repo_id=repo_id
)

api.upload_folder(
    folder_path="/content/drive/MyDrive/trained_whisper_tokenizer",
    path_in_repo="",
    repo_id=repo_id
)
