In [None]:
from datasets import load_dataset

dataset = load_dataset("DigitalUmuganda/ASR_Fellowship_Challenge_Dataset")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Resolving data files:   0%|          | 0/55 [00:00<?, ?it/s]

Downloading data:   0%|          | 0/28 [00:00<?, ?files/s]

train_tarred/sharded_manifests_with_imag(…):   0%|          | 0.00/2.07G [00:00<?, ?B/s]

train_tarred/sharded_manifests_with_imag(…):   0%|          | 0.00/2.07G [00:00<?, ?B/s]

In [None]:
train = dataset["train"]
test = dataset["test"]
validation = dataset["validation"]


In [None]:
print(ds["train"].column_names)  # list of column names
print(ds["train"][0])  # look at the first sample to see what fields it

In [None]:
ds["train"].features


In [None]:
pip install transformers datasets evaluate jiwer accelerate soundfile librosa huggingface_hub

In [None]:
import os
import torch
import torch.nn as nn
from datasets import load_dataset, Audio
from transformers import (
    Wav2Vec2ForCTC,
    Wav2Vec2Processor,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding
)
import evaluate
import numpy as np

# -------------------------
# Config
# -------------------------
model_name = "facebook/wav2vec2-base"   # pick a suitable pretrained ASR model
adapter_dim = 64                        # bottleneck size for adapter
sample_rate = 16_000
batch_size = 8
learning_rate = 3e-4
num_train_epochs = 6
output_dir = "./wav2vec2_adapters"

# -------------------------
# Load dataset
# -------------------------
ds = load_dataset("DigitalUmuganda/ASR_Fellowship_Challenge_Dataset")

print(ds)                    # view splits
print(ds["train"].column_names)  # see columns
# The example you showed uses 'audio_filepath' and 'text' fields. Adjust keys below accordingly.
AUDIO_COL = "audio"           # we will map the dataset to have an 'audio' column of type Audio
TEXT_COL = "text"             # transcript column name in dataset; change if different

# If dataset stores audio references under another column (like 'audio_filepath') map it:
if "audio_filepath" in ds["train"].column_names:
    # map to 'audio' so datasets Audio feature can load it
    def remap_audio(example):
        example["audio"] = example["audio_filepath"]
        return example
    ds = ds.map(remap_audio)

# Ensure audio column is typed as Audio so the library will load & resample for us
ds = ds.cast_column("audio", Audio(sampling_rate=sample_rate))

# -------------------------
# Load processor (feature extractor + tokenizer)
# -------------------------
processor = Wav2Vec2Processor.from_pretrained(model_name)

# If processor has tokenizer vocab mismatch for your language, you may need a custom tokenizer/vocab.
# But many multilingual or base English models use simple character-level CTC. This dataset's `text` is lowercased already.

# Preprocessing: convert audio to input_values and text to labels (ids)
def prepare_batch(batch):
    # load audio array
    audio = batch["audio"]["array"]
    # feature extraction: returns input_values
    inputs = processor(audio, sampling_rate=sample_rate, return_tensors="pt", padding=False)
    batch["input_values"] = inputs.input_values[0]  # store 1D tensor
    # encode text -> labels
    with processor.as_target_processor():
        labels = processor(batch[TEXT_COL], return_tensors="pt", padding=False).input_ids
    batch["labels"] = labels[0]
    return batch

# Apply mapping (batched=False because audio loading often needs per-sample mapping)
ds = ds.map(prepare_batch, remove_columns=[c for c in ds["train"].column_names if c not in ("input_values","labels")])

# Set dataset format for PyTorch
ds["train"].set_format(type="torch", columns=["input_values", "labels"])
ds["validation"].set_format(type="torch", columns=["input_values", "labels"])
ds["test"].set_format(type="torch", columns=["input_values", "labels"])

# -------------------------
# Load pretrained model
# -------------------------
model = Wav2Vec2ForCTC.from_pretrained(
    model_name,
    gradient_checkpointing=False,
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id if processor.tokenizer else 0
)

# -------------------------
# Adapter module definition
# -------------------------
class Adapter(nn.Module):
    def __init__(self, hidden_size, adapter_dim=64):
        super().__init__()
        self.down = nn.Linear(hidden_size, adapter_dim)
        self.act = nn.ReLU()
        self.up = nn.Linear(adapter_dim, hidden_size)
        # initialize small weights
        nn.init.zeros_(self.up.weight)
        nn.init.zeros_(self.up.bias)

    def forward(self, x):
        # x shape: (batch, seq, hidden_size)
        y = self.down(x)
        y = self.act(y)
        y = self.up(y)
        return y

# -------------------------
# Insert adapter modules into model encoder layers
# -------------------------
# The exact attribute path depends on the model's class. For Wav2Vec2,
# encoder layers are at model.wav2vec2.encoder.layers (transformer-like)
encoder = model.wav2vec2.encoder
hidden_size = encoder.layers[0].feed_forward.intermediate_dense.out_features if hasattr(encoder.layers[0], "feed_forward") else encoder.layers[0].attention.out_proj.out_features

# Simpler: try to get hidden size from model config
hidden_size = model.config.hidden_size

for i, layer in enumerate(encoder.layers):
    adapter = Adapter(hidden_size, adapter_dim=adapter_dim)
    # attach to layer for convenience
    layer.adapter = adapter
    # move adapter to model device
    layer.adapter.to(model.device)

# -------------------------
# Freeze base model, leave adapters trainable
# -------------------------
for name, param in model.named_parameters():
    param.requires_grad = False

# Make adapters trainable
for layer in encoder.layers:
    for p in layer.adapter.parameters():
        p.requires_grad = True

# Optionally make CTC head trainable (uncomment if desired)
# for p in model.lm_head.parameters():
#     p.requires_grad = True

# Verify trainable params
trainable_params = [p for p in model.parameters() if p.requires_grad]
print("Trainable params:", sum(p.numel() for p in trainable_params))

# -------------------------
# Monkey patch the forward pass to use adapters
# -------------------------
# We'll wrap the encoder's forward by adding adapter outputs to the hidden_states after feed-forward
# WARNING: monkey-patching internals can vary per model version. This is a straightforward approach
# that will add adapter outputs after the transformer's layer output.

orig_forward = encoder.__call__  # keep reference if available

def encoder_forward_with_adapters(self, hidden_states, **kwargs):
    # Use the usual encoder forward but call each layer individually to apply adapter
    mask = kwargs.get("mask", None)
    for i, layer in enumerate(self.layers):
        # forward through layer (most Wav2Vec2 encoder layer expose a .forward method)
        # We'll attempt to call layer(hidden_states, mask=mask, ...) - adjust if signature differs
        layer_outputs = layer(hidden_states, attention_mask=mask) if "attention_mask" in layer.forward.__code__.co_varnames else layer(hidden_states)
        # layer_outputs is typically a tuple or tensor; find the next hidden states
        if isinstance(layer_outputs, tuple):
            next_hidden_states = layer_outputs[0]
        else:
            next_hidden_states = layer_outputs
        # apply adapter and residual-add
        adapter_out = layer.adapter(next_hidden_states)
        hidden_states = next_hidden_states + adapter_out
    return hidden_states

# Bind the new function to encoder instance
import types
encoder.__call__ = types.MethodType(encoder_forward_with_adapters, encoder)

# -------------------------
# Data collator
# -------------------------
# We need dynamic padding for input_values and labels for CTC
class DataCollatorCTCWithPadding:
    def __init__(self, processor, padding=True):
        self.processor = processor
        self.padding = padding

    def __call__(self, features):
        input_values = [f["input_values"] for f in features]
        labels = [f["labels"] for f in features]
        batch = self.processor.pad({"input_values": input_values}, return_tensors="pt")
        # pad labels manually (ctc trainer will convert -100 for pad)
        label_batch = self.processor.pad({"input_ids": labels}, return_tensors="pt", padding=True)["input_ids"]
        # replace padding token id's (if any) by -100 so they are ignored by loss
        label_batch[label_batch == self.processor.tokenizer.pad_token_id] = -100
        batch["labels"] = label_batch
        return batch

data_collator = DataCollatorCTCWithPadding(processor=processor)

# -------------------------
# Compute WER metric
# -------------------------
wer_metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)
    pred_str = processor.batch_decode(pred_ids)
    # labels: convert -100 to pad token id before decoding
    label_ids = pred.label_ids
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, group_tokens=False)
    wer = wer_metric.compute(predictions=pred_str, references=label_str)
    return {"wer": wer}

# -------------------------
# Training args and Trainer
# -------------------------
training_args = TrainingArguments(
    output_dir=output_dir,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    gradient_accumulation_steps=2,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=learning_rate,
    num_train_epochs=num_train_epochs,
    fp16=torch.cuda.is_available(),
    logging_steps=50,
    push_to_hub=False,
    report_to=[],
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=ds["train"],
    eval_dataset=ds["validation"],
    data_collator=data_collator,
    tokenizer=processor.feature_extractor,  # for padding etc.
    compute_metrics=compute_metrics,
)

# -------------------------
# Train
# -------------------------
trainer.train()

# -------------------------
# Save adapter weights only
# -------------------------
# We'll collect adapter state dicts into a small file
adapter_state = {}
for i, layer in enumerate(encoder.layers):
    adapter_state[f"layer_{i}_adapter"] = layer.adapter.state_dict()

torch.save(adapter_state, os.path.join(output_dir, "adapters_only.pth"))

print("Adapters saved.")


In [None]:
# Evaluate base model:
base = Wav2Vec2ForCTC.from_pretrained(model_name)
# run Trainer evaluate with compute_metrics above to get baseline WER
