In [None]:
!pip install datasets torchaudio librosa soundfile jiwer gradio torchcodec

In [None]:
#mount drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from datasets import Dataset
from transformers import (
    Wav2Vec2ForCTC,
    Wav2Vec2Processor,
    TrainingArguments,
    Trainer,
    Wav2Vec2CTCTokenizer, # Use Wav2Vec2CTCTokenizer for direct vocab_file loading
    Wav2Vec2FeatureExtractor # Import Wav2Vec2FeatureExtractor
)
import torch

# -------------------------------
# PATHS ‚Äî UPDATE THESE
# -------------------------------
BASE_MODEL = "/content/drive/MyDrive/Ibibio_Voice/wav2vec2/checkpoint-3960"   # your CV-trained model
CUSTOM_DATASET_PATH = "/content/drive/MyDrive/Ibibio_Voice/Data/ibb/clips"  # if saved, otherwise load your Dataset("...")
VOCAB_PATH = "/content/drive/MyDrive/Ibibio_Voice/wav2vec2/checkpoint-3960/vocab.json"

# --------------------------------
# LOAD MODEL + PROCESSOR
# --------------------------------
# Load the feature extractor from the base model path
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(BASE_MODEL)

# Load the tokenizer using the explicit vocab file via Wav2Vec2CTCTokenizer
tokenizer = Wav2Vec2CTCTokenizer(
    vocab_file=VOCAB_PATH, # Pass the vocab.json path directly
    do_lower_case=True,
    unk_token="[UNK]", # Add unk_token if not specified
    pad_token="[PAD]", # Add pad_token if not specified
    word_delimiter_token="|" # Assuming common voice setup
)

# Combine them into a Wav2Vec2Processor
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

model = Wav2Vec2ForCTC.from_pretrained(
    BASE_MODEL,
    vocab_size=len(processor.tokenizer),
    pad_token_id=processor.tokenizer.pad_token_id,
    ctc_loss_reduction="mean",
)

print("Loaded CV-trained model ‚úì")

In [None]:
import os, sys, math, random, json, shutil, time
from pathlib import Path
import numpy as np, pandas as pd
import torch
print('torch', torch.__version__, 'cuda available:', torch.cuda.is_available())
DRIVE_ROOT = '/content/drive/MyDrive/ibibio_asr'
os.makedirs(DRIVE_ROOT, exist_ok=True)

LOCAL_TSV_DIR = os.path.join(DRIVE_ROOT, 'common_voice_tsvs')
COMMON_VOICE_DIR = os.path.join(DRIVE_ROOT, 'common_voice_23_0_ibb')
OUTPUT_DIR = os.path.join(DRIVE_ROOT, 'wav2vec2_xlsr_optionA')
os.makedirs(OUTPUT_DIR, exist_ok=True)

PRETRAINED_MODEL = 'facebook/wav2vec2-large-xlsr-53'
SAMPLE_RATE = 16000
MIN_AUDIO = 0.5
MAX_AUDIO = 30.0

# Pretraining vs finetuning params (defaults small for Colab)
PRETRAIN_EPOCHS = 1   # set low for Colab testing; raise for serious pretraining
FINETUNE_EPOCHS = 3
PRETRAIN_BATCH = 8
FINETUNE_BATCH = 4

print('Configuration set. OUTPUT_DIR=', OUTPUT_DIR)


In [None]:
import os

CV_PATH = "/content/drive/MyDrive/Ibibio_Voice/Data/ibb/"
print("CSV exists:", os.path.exists(os.path.join(CV_PATH, "ibbsdd.csv")))
print("Path checked:", os.path.join(CV_PATH, "ibbsdd.csv"))


In [None]:
import pandas as pd

df = pd.read_csv("/content/drive/MyDrive/Ibibio_Voice/Data/ibb/ibbsdd.csv", encoding="utf-8-sig")
print(df.columns)
print(df.head())


In [None]:
from datasets import load_dataset, DatasetDict, Dataset, Audio
import pandas as pd
import os
import librosa
import soundfile as sf

CV_PATH = '/content/drive/MyDrive/Ibibio_Voice/Data/ibb/'

def get_duration_fixed(example):
    """Universal duration function that works with Audio objects"""
    try:
        # Get the actual file path from various possible structures
        if hasattr(example['path'], 'path'):
            file_path = example['path'].path
        elif isinstance(example['path'], dict) and 'path' in example['path']:
            file_path = example['path']['path']
        else:
            file_path = example['path']

        if file_path and os.path.exists(file_path):
            duration = librosa.get_duration(filename=file_path)
            return float(duration)
        else:
            print(f"File not found: {file_path}")
            return 0.0
    except Exception as e:
        print(f"Error getting duration: {e}")
        return 0.0

def load_all_datasets_with_duration_filtering():
    """Load all datasets with duration filtering applied"""

    # 1. Load the main splits (dev, test, train)
    main_splits = {}
    for split_name in ['ibbsdd']:
        csv_path = os.path.join(CV_PATH, f"{split_name}.csv")
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path, sep=',', encoding='latin1', on_bad_lines='skip', engine='python')

            # Ensure 'path' column is string type and handle NaN values
            df['path'] = df['path'].astype(str)
            df = df[df['path'] != 'nan'] # Remove rows where 'path' was NaN

            # Check if df is empty after cleaning
            if df.empty:
                print(f"Warning: Dataset for {split_name} is empty after cleaning 'path' column.")
                main_splits[split_name] = Dataset.from_pandas(pd.DataFrame(columns=['path', 'sentence', 'duration']))
                continue

            # Construct full path to audio files using CUSTOM_DATASET_PATH
            df["path"] = df["path"].apply(lambda p: os.path.join(CUSTOM_DATASET_PATH, p))

            dataset = Dataset.from_pandas(df)

            # Add duration and filter
            dataset = dataset.map(lambda x: {'duration': get_duration_fixed(x)})
            dataset = dataset.filter(lambda x: MIN_AUDIO <= x['duration'] <= MAX_AUDIO)

            main_splits[split_name] = dataset
            print(f"Loaded {split_name}: {len(dataset)} samples (after duration filtering)")

    return main_splits

In [None]:
# Load all data with duration filtering
print("=== Loading datasets with duration filtering ===")
main_splits = load_all_datasets_with_duration_filtering()
main_splits

In [None]:
dataset = main_splits.get('ibbsdd')

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Union, Optional
import torch

@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = "longest"

    def __call__(self, features: List[Dict]) -> Dict[str, torch.Tensor]:
        # Filter out any sample that somehow has bad labels or inputs
        clean_features = []
        for f in features:
            if (
                f.get("input_values") is not None
                and f.get("labels") is not None
                and isinstance(f["labels"], list)
                and len(f["labels"]) > 0
                and all(t is not None for t in f["labels"])
            ):
                clean_features.append(f)

        if len(clean_features) == 0:
            raise ValueError("All features in batch had invalid labels or inputs.")

        # Split inputs and labels
        input_features = [{"input_values": f["input_values"]} for f in clean_features]
        label_features = [{"input_ids": f["labels"]} for f in clean_features]

        # Pad inputs
        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )

        # Pad labels
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # Replace padding with -100 to ignore in loss
        labels = labels_batch["input_ids"].masked_fill(
            labels_batch.attention_mask.ne(1), -100
        )

        batch["labels"] = labels
        return batch

data_collator = DataCollatorCTCWithPadding(processor=processor)


In [None]:
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Ibibio_Voice/ibb_stage2_custom",
    group_by_length=True,
    per_device_train_batch_size=2,  # Reduced batch size
    per_device_eval_batch_size=2,   # Reduced batch size
    gradient_accumulation_steps=8,
    remove_unused_columns=False,
    save_strategy="epoch",
    eval_strategy="epoch",
    num_train_epochs=30,           # start small; you can increase later
    fp16=torch.cuda.is_available(),
    save_steps=500,
    logging_strategy="steps",
    logging_steps=50,               # VERY IMPORTANT for TensorBoard
    eval_steps=200,

    learning_rate=1e-4,
    save_total_limit=2,

    report_to=["tensorboard"],

    # reduce GPU fragmentation
    dataloader_num_workers=2,
)
model.gradient_checkpointing_enable()

In [None]:
import numpy as np
from jiwer import wer, cer

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred_str = processor.batch_decode(pred_ids)
    # remove padding from labels
    label_ids = pred.label_ids
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, group_tokens=False)

    return {
        "wer": wer(label_str, pred_str),
        "cer": cer(label_str, pred_str)
    }


In [None]:
from datasets import Audio

dataset = dataset.cast_column("path", Audio(sampling_rate=16000))


In [None]:
import json
import re

# Ibibio digraphs that must be treated as single symbols
DIGRAPHS = ["kp", "gb", "ny", "nw"]

# Normalize text
def normalize_text(s):
    s = s.lower().strip()
    s = s.replace("‚Äô", "'")

    # keep ibibio vowels with diacritics
    allowed = "abcdefghijklmnopqrstuvwxyz√°√©√≠√≥√∫√†√®√¨√≤√π·ªç·ª•√± å "
    s = ''.join(ch for ch in s if ch in allowed)

    # collapse spaces
    s = re.sub(r"\s+", " ", s)
    return s

# Step 1: normalize
train_text = [normalize_text(t) for t in dataset["sentence"]]

# Step 2: grapheme-tokenize (detect digraphs first)
graphemes = set()

for sentence in train_text:
    i = 0
    while i < len(sentence):
        # skip spaces
        if sentence[i] == " ":
            graphemes.add(" ")
            i += 1
            continue

        # try matching digraph
        matched = False
        for dg in DIGRAPHS:
            if sentence[i:i+len(dg)] == dg:
                graphemes.add(dg)
                i += len(dg)
                matched = True
                break

        if not matched:
            graphemes.add(sentence[i])
            i += 1

# Step 3: sort graphemes, placing space first
graphemes = sorted(list(graphemes))

# Step 4: build vocab
vocab = {g: i for i, g in enumerate(graphemes)}

# Add special CTC tokens
vocab["|"] = len(vocab)       # blank token
vocab["[UNK]"] = len(vocab)
vocab["[PAD]"] = len(vocab)

# Save
with open("vocab.json", "w", encoding="utf-8") as f:
    json.dump(vocab, f, ensure_ascii=False, indent=2)

print("FINAL VOCAB:", vocab)
print("SIZE:", len(vocab))


In [None]:
def normalize_text_for_labels(s: str) -> str:
    # <-- use the SAME normalize_text you used when building the vocab
    s = s.lower()
    s = s.replace("‚Äô", "'")

    # Ibibio digraphs that must be treated as single symbols
    DIGRAPHS = ["kp", "gb", "ny", "nw"] # ensure this matches global DIGRAPHS
    # Reimplement the grapheme logic used in vocab creation
    # Here we are just normalizing, not tokenizing into graphemes
    # The tokenizer will handle the grapheme mapping based on the vocab.json

    allowed = "abcdefghijklmnopqrstuvwxyz√°√©√≠√≥√∫√†√®√¨√≤√π·ªç·ª•√± å '"
    s = ''.join(ch for ch in s if ch in allowed)

    # collapse spaces
    s = re.sub(r"\s+", " ", s)
    return s.strip()

def prepare_dataset(batch):
    # 1) audio to input_values
    audio = batch["path"]
    batch["input_values"] = processor(
        audio["array"],
        sampling_rate=audio["sampling_rate"]
    ).input_values[0]

    # 2) normalize text & filter
    text = normalize_text_for_labels(batch["sentence"])
    batch["sentence"] = text

    # if text becomes empty, mark labels None (we'll filter later)
    if text == "":
        batch["labels"] = None
        return batch

    # 3) text -> label ids
    # Use the recommended way to process labels without as_target_processor
    batch["labels"] = processor(text=text).input_ids

    return batch


In [None]:
processed = dataset.map(
    prepare_dataset,
    remove_columns=[c for c in dataset.column_names if c not in ["sentence", "input_values", "labels"]],  # remove unused cols

)


In [None]:
processed = processed.train_test_split(test_size=0.1, seed=42)
train_ds = processed["train"]
eval_ds = processed["test"]


In [None]:
import os
import json
import matplotlib.pyplot as plt
from transformers import TrainerCallback

class SavePlotsCallback(TrainerCallback):
    """
    Saves training graphs (loss, WER, CER) to PNG files
    inside the training output directory.
    """

    def on_train_end(self, args, state, control, **kwargs):
        output_dir = args.output_dir
        os.makedirs(output_dir, exist_ok=True)

        # --------------------------
        # Load Trainer's metrics log
        # --------------------------
        log_history = state.log_history

        train_loss = []
        eval_loss = []
        wer_vals = []
        cer_vals = []
        steps = []

        for entry in log_history:
            if "loss" in entry and "epoch" in entry:
                train_loss.append(entry["loss"])
                steps.append(entry["step"])
            if "eval_loss" in entry:
                eval_loss.append(entry["eval_loss"])
            if "eval_wer" in entry:
                wer_vals.append(entry["eval_wer"])
            if "eval_cer" in entry:
                cer_vals.append(entry["eval_cer"])

        # --------------------------
        # Save Training Loss Plot
        # --------------------------
        if train_loss:
            plt.figure()
            plt.plot(train_loss)
            plt.title("Training Loss Curve")
            plt.xlabel("Logging Step")
            plt.ylabel("Loss")
            plt.grid(True)
            plt.savefig(os.path.join(output_dir, "training_loss.png"))
            plt.close()

        # --------------------------
        # Save Eval Loss Plot
        # --------------------------
        if eval_loss:
            plt.figure()
            plt.plot(eval_loss)
            plt.title("Validation Loss Curve")
            plt.xlabel("Evaluation Step")
            plt.ylabel("Loss")
            plt.grid(True)
            plt.savefig(os.path.join(output_dir, "validation_loss.png"))
            plt.close()

        # --------------------------
        # Save WER Plot
        # --------------------------
        if wer_vals:
            plt.figure()
            plt.plot(wer_vals)
            plt.title("Word Error Rate (WER)")
            plt.xlabel("Evaluation Step")
            plt.ylabel("WER")
            plt.grid(True)
            plt.savefig(os.path.join(output_dir, "wer.png"))
            plt.close()

        # --------------------------
        # Save CER Plot
        # --------------------------
        if cer_vals:
            plt.figure()
            plt.plot(cer_vals)
            plt.title("Character Error Rate (CER)")
            plt.xlabel("Evaluation Step")
            plt.ylabel("CER")
            plt.grid(True)
            plt.savefig(os.path.join(output_dir, "cer.png"))
            plt.close()

        print(f"\nüìä Saved training plots into: {output_dir}\n")


In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    tokenizer=processor.feature_extractor,
    compute_metrics=compute_metrics,
    callbacks=[SavePlotsCallback()],
)

trainer.train()
trainer.save_model("/content/drive/MyDrive/Ibibio_Voice/ibb_stage2_custom/")

print("‚úì Stage 2 training complete!")


  trainer = Trainer(
  torch._C._get_cudnn_allow_tf32(),


Epoch,Training Loss,Validation Loss,Wer,Cer
1,No log,1.662066,1.011976,0.807524
2,No log,0.605665,0.772455,0.3
3,2.559200,0.463349,0.592814,0.123824
4,2.559200,0.438448,0.520958,0.107524
5,0.498800,0.366206,0.45509,0.096865
6,0.498800,0.350299,0.45509,0.092476
7,0.363900,0.312249,0.407186,0.098119
8,0.363900,0.289065,0.347305,0.07931
9,0.344200,0.274879,0.323353,0.07931
10,0.344200,0.281011,0.359281,0.078683




Epoch,Training Loss,Validation Loss,Wer,Cer
1,No log,1.662066,1.011976,0.807524
2,No log,0.605665,0.772455,0.3
3,2.559200,0.463349,0.592814,0.123824
4,2.559200,0.438448,0.520958,0.107524
5,0.498800,0.366206,0.45509,0.096865
6,0.498800,0.350299,0.45509,0.092476
7,0.363900,0.312249,0.407186,0.098119
8,0.363900,0.289065,0.347305,0.07931
9,0.344200,0.274879,0.323353,0.07931
10,0.344200,0.281011,0.359281,0.078683


In [None]:
import torch

def transcribe(audio_dict):
    with torch.no_grad():
        input_values = torch.tensor(audio_dict["input_values"]).unsqueeze(0)
        logits = model(input_values).logits
        pred_ids = torch.argmax(logits, dim=-1)
        return processor.decode(pred_ids[0])
