In [1]:
!pip install datasets



In [2]:
!pip install evaluate



In [5]:
"""
Enhanced Buckwalter Transliteration for Arabic
- Includes standard Buckwalter mapping
- Adds extended Buckwalter characters
- Includes XML-safe variant mappings
- Handles special cases and diacritics properly
"""

# Standard Buckwalter → Arabic mapping
BUCKWALTER_TO_ARABIC = {
    "'": "\u0621",  # hamza-on-the-line
    "|": "\u0622",  # madda-on-alif
    ">": "\u0623",  # hamza-on-alif
    "&": "\u0624",  # hamza-on-waw
    "<": "\u0625",  # hamza-under-alif
    "}": "\u0626",  # hamza-on-ya
    "A": "\u0627",  # alif
    "b": "\u0628",  # ba
    "p": "\u0629",  # ta marbuta
    "t": "\u062A",  # ta
    "v": "\u062B",  # tha
    "j": "\u062C",  # jim
    "H": "\u062D",  # ha (voiceless pharyngeal fricative)
    "x": "\u062E",  # kha
    "d": "\u062F",  # dal
    "*": "\u0630",  # dhal
    "r": "\u0631",  # ra
    "z": "\u0632",  # zay
    "s": "\u0633",  # sin
    "$": "\u0634",  # shin
    "S": "\u0635",  # sad
    "D": "\u0636",  # dad
    "T": "\u0637",  # ta (emphatic)
    "Z": "\u0638",  # za (emphatic)
    "E": "\u0639",  # ayn
    "g": "\u063A",  # ghayn
    "_": "\u0640",  # tatweel (kashida)
    "f": "\u0641",  # fa
    "q": "\u0642",  # qaf
    "k": "\u0643",  # kaf
    "l": "\u0644",  # lam
    "m": "\u0645",  # mim
    "n": "\u0646",  # nun
    "h": "\u0647",  # ha
    "w": "\u0648",  # waw
    "Y": "\u0649",  # alif maqṣūra
    "y": "\u064A",  # ya
    "F": "\u064B",  # fathatān
    "N": "\u064C",  # ḍammatān
    "K": "\u064D",  # kasratān
    "a": "\u064E",  # fatha
    "u": "\u064F",  # ḍamma
    "i": "\u0650",  # kasra
    "~": "\u0651",  # shadda
    "o": "\u0652",  # sukun
    "`": "\u0670",  # dagger alif (superscript alif)
    "{": "\u0671",  # alif wasla (hamzat wasl)

    # Extended Buckwalter symbols
    "P": "\u067E",  # peh (Persian)
    "J": "\u0686",  # tcheh (Persian)
    "V": "\u06A4",  # veh (Persian)
    "G": "\u06AF",  # gaf (Persian)
    "R": "\u0695",  # reh with small v below (Kurdish)
    "O": "\u06C1",  # heh goal (Urdu)
    "W": "\u06CF",  # waw with dot above (Kurdish)

    # Additional diacritics
    "^": "\u0653",  # maddah above
    "#": "\u0654",  # hamza above
    "`": "\u0670",  # superscript alif
    "\"": "\u06DF",  # small high rounded zero
    "[": "\u06DC",  # small high seen
    ";": "\u061B",  # Arabic semicolon
    ",": "\u060C",  # Arabic comma
    ".": "\u06D4",  # Arabic full stop
    "?": "\u061F",  # Arabic question mark

    # XML-safe alternatives (for the XML-safe version of Buckwalter)
    "O": "\u0647",  # Alternative mapping for heh (XML-safe)
    "I": "\u0649",  # Alternative mapping for alif maqṣūra (XML-safe)
    "W": "\u0624",  # Alternative mapping for hamza-on-waw (XML-safe)
    "Q": "\u0626",  # Alternative mapping for hamza-on-ya (XML-safe)

    # Additional punctuation and symbols
    "-": "\u0640",  # tatweel (kashida)
    "%": "\u066A",  # Arabic percent sign
    "0": "\u0660",  # Arabic-Indic digit 0
    "1": "\u0661",  # Arabic-Indic digit 1
    "2": "\u0662",  # Arabic-Indic digit 2
    "3": "\u0663",  # Arabic-Indic digit 3
    "4": "\u0664",  # Arabic-Indic digit 4
    "5": "\u0665",  # Arabic-Indic digit 5
    "6": "\u0666",  # Arabic-Indic digit 6
    "7": "\u0667",  # Arabic-Indic digit 7
    "8": "\u0668",  # Arabic-Indic digit 8
    "9": "\u0669",  # Arabic-Indic digit 9
}

def buckwalter_to_arabic(text, preserve_latin=False, preserve_digits=True):
    """
    Convert a Buckwalter-transliterated string to Arabic script.

    Parameters:
        text (str): Input text in Buckwalter transliteration.
        preserve_latin (bool): If True, preserve Latin characters that don't have mappings.
                             If False, keep them unchanged (default False).
        preserve_digits (bool): If True, keep Western (ASCII) digits as is.
                              If False, convert to Arabic-Indic digits (default True).

    Returns:
        str: The corresponding Arabic-script string.
    """
    if not text:
        return ""

    result = []
    i = 0
    while i < len(text):
        ch = text[i]

        # Handle special sequences
        if i < len(text) - 1 and ch + text[i+1] == "lA":  # lam-alif ligature
            result.append("\u0644\u0627")
            i += 2
            continue
        elif i < len(text) - 1 and ch + text[i+1] == "lM":  # lam-alif with madda
            result.append("\u0644\u0622")
            i += 2
            continue

        # Handle regular digits if preserve_digits is True
        if preserve_digits and ch.isdigit():
            result.append(ch)
            i += 1
            continue

        # Handle spaces and punctuation that should be preserved
        if ch.isspace() or ch in '!@#$%^&*()_+-=[]{}|;:",.<>?/':
            if ch not in BUCKWALTER_TO_ARABIC:  # Only preserve if not in the mapping
                result.append(ch)
                i += 1
                continue

        # Standard character mapping
        arabic_char = BUCKWALTER_TO_ARABIC.get(ch)

        # If not found in the mapping
        if arabic_char is None:
            if preserve_latin or not ch.isalpha():
                result.append(ch)  # Keep Latin characters and non-alphabetic characters unchanged
            else:
                # Skip characters without mapping if not preserving Latin
                pass
        else:
            result.append(arabic_char)

        i += 1

    return "".join(result)


def arabic_to_buckwalter(text):
    """
    Convert Arabic script to Buckwalter transliteration.

    Parameters:
        text (str): Input text in Arabic script.

    Returns:
        str: The corresponding Buckwalter transliteration.
    """
    # Create a reverse mapping dictionary
    ARABIC_TO_BUCKWALTER = {v: k for k, v in BUCKWALTER_TO_ARABIC.items()}

    result = []
    for ch in text:
        # Map each character, or leave it unchanged if no mapping exists
        result.append(ARABIC_TO_BUCKWALTER.get(ch, ch))
    return "".join(result)


# Example usage
if __name__ == "__main__":
    # Test the function with some examples
    buck_text = "Al-salAmu Ealaykum"
    arabic_text = buckwalter_to_arabic(buck_text)
    print(f"Original: {buck_text}")
    print(f"Arabic: {arabic_text}")

    # Test round-trip conversion
    back_to_buck = arabic_to_buckwalter(arabic_text)
    print(f"Back to Buckwalter: {back_to_buck}")

Original: Al-salAmu Ealaykum
Arabic: الـسَلامُ عَلَيكُم
Back to Buckwalter: Al-salAmu Ealaykum


# preparing tha dataset

all of the datasets have samiliar distribution as the records are 5-20 seconds and for each split 200 records which satfies the minimum amount of data

In [6]:
from datasets import load_dataset, concatenate_datasets, Audio, Dataset
import random

# 1️⃣ Load raw “train” splits
egy_raw = load_dataset("MightyStudent/Egyptian-ASR-MGB-3", split="train")
cl_raw  = load_dataset("MBZUAI/ClArTTS", split="train")
msa_raw = load_dataset("halabi2016/arabic_speech_corpus", split="train")

# 2️⃣ Keep only 'audio' and 'text'
def keep_columns(ds, text_col):
    ds = ds.rename_column(text_col, "text")
    return ds.remove_columns([c for c in ds.column_names if c not in {"audio", "text"}])

egy = keep_columns(egy_raw, "sentence")
msa = msa_raw.remove_columns([c for c in msa_raw.column_names if c not in {"audio", "text"}])

# Subsample 1000 from ClArTTS up front
cl_raw_subset = cl_raw.shuffle(seed=0).select(range(1000))

def wrap_classic(ex):
    return {
        "text":  ex["text"],
        "audio": {"array": ex["audio"], "sampling_rate": ex["sampling_rate"]}
    }

cl = cl_raw_subset.map(wrap_classic, remove_columns=[c for c in cl_raw_subset.column_names if c not in {"audio", "text"}])

# 3️⃣ Cast all audio to 16kHz
egy = egy.cast_column("audio", Audio(sampling_rate=16000))
cl  = cl.cast_column("audio", Audio(sampling_rate=16000))
msa = msa.cast_column("audio", Audio(sampling_rate=16000))

# 4️⃣ Select examples until each dataset has at least 30 minutes (~1800 sec)
def select_until_duration(dataset, min_seconds):
    total, selected = 0.0, []
    for ex in dataset:
        array = ex["audio"]["array"]
        sr = ex["audio"]["sampling_rate"]
        duration = len(array) / sr
        ex["duration"] = duration
        total += duration
        selected.append(ex)
        if total >= min_seconds:
            break
    return selected

target_duration = 1800  # 30 minutes in seconds

# Shuffle before selection
egy = egy.shuffle(seed=1)
msa = msa.shuffle(seed=1)
cl  = cl.shuffle(seed=1)

# Select just enough from each
egy_sel = select_until_duration(egy, target_duration)
msa_sel = select_until_duration(msa, target_duration)
cl_sel  = select_until_duration(cl,  target_duration)

# Convert back to datasets
egy_bal = Dataset.from_list(egy_sel)
msa_bal = Dataset.from_list(msa_sel)
cl_bal  = Dataset.from_list(cl_sel)



msa_bal = msa_bal.map(lambda ex, idx: {"text": buckwalter_to_arabic(ex["text"])} , with_indices=True)

# 6️⃣ Combine all datasets
combined = concatenate_datasets([egy_bal, msa_bal, cl_bal])
print(combined)

# 7️⃣ Print durations for each
def compute_duration_stats(dataset, name=""):
    durations = dataset["duration"]
    total_duration = sum(durations)
    avg_duration = total_duration / len(durations)
    print(f"{name} — Total: {total_duration:.2f}s, Avg/sample: {avg_duration:.2f}s")

compute_duration_stats(egy_bal, "Egyptian Arabic")
compute_duration_stats(msa_bal, "MSA")
compute_duration_stats(cl_bal,  "Classical Arabic")

# 8️⃣ Optionally save the combined dataset
combined.save_to_disk("combined_dataset_balanced")


Resolving data files:   0%|          | 0/26 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/21 [00:00<?, ?it/s]

Map:   0%|          | 0/234 [00:00<?, ? examples/s]

Dataset({
    features: ['audio', 'text', 'duration'],
    num_rows: 733
})
Egyptian Arabic — Total: 1818.62s, Avg/sample: 25.61s
MSA — Total: 1808.48s, Avg/sample: 7.73s
Classical Arabic — Total: 1801.59s, Avg/sample: 4.21s


Saving the dataset (0/2 shards):   0%|          | 0/733 [00:00<?, ? examples/s]

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
!cp -r combined_dataset_balanced /content/drive/MyDrive/

In [7]:
from datasets import load_from_disk

combined = load_from_disk("combined_dataset_balanced")


In [9]:
!pip install jiwer



In [3]:
import torch
from transformers import pipeline, AutoProcessor, AutoModelForSpeechSeq2Seq
from jiwer import wer
import librosa
import evaluate

In [4]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

## wav2vec2-large-xlsr-53-arabic model

In [12]:
# Initialize the ASR pipeline
pipe_wav2vec2 = pipeline("automatic-speech-recognition", model="jonatasgrosman/wav2vec2-large-xlsr-53-arabic")

Device set to use cuda:0


In [16]:
def resample_audio(audio_array, orig_sr, target_sr=16000):
    if orig_sr != target_sr:
        return librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr)
    return audio_array

import numpy as np

# Ensure that each audio is a NumPy array, as the pipeline expects ndarray
audios = [np.array(ex["audio"]["array"]) for ex in combined]

# Transcribe in batch
transcriptions = pipe_wav2vec2(audios, batch_size=16)
predicted_texts = [t["text"] for t in transcriptions]


# Extract ground truth texts
ground_truth = combined["text"]


In [17]:
cer = evaluate.load("cer")
cer_score_wav2vec2 = cer.compute(predictions=predicted_texts, references=ground_truth)
print(f"CER: {cer_score_wav2vec2}")

CER: 0.23297813993575178


## wav2vec2-large-xlsr-53-arabic-egyptian

In [18]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe_wav2vec2_egy = pipeline("automatic-speech-recognition", model="arbml/wav2vec2-large-xlsr-53-arabic-egyptian")

Device set to use cuda:0


In [None]:
def resample_audio(audio_array, orig_sr, target_sr=16000):
    if orig_sr != target_sr:
        return librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr)
    return audio_array

import numpy as np

# Ensure that each audio is a NumPy array, as the pipeline expects ndarray
audios = [np.array(ex["audio"]["array"]) for ex in combined]

# Transcribe in batch
transcriptions = pipe_wav2vec2_egy(audios, batch_size=16)
predicted_texts = [t["text"] for t in transcriptions]


# Extract ground truth texts
ground_truth = combined["text"]


In [21]:
cer = evaluate.load("cer")
cer_score_wav2vec2_egy = cer.compute(predictions=predicted_texts, references=ground_truth)
print(f"CER: {cer_score_wav2vec2_egy}")

CER: 0.4308025803755648


## Whisper large

In [5]:
model_id = "openai/whisper-large"
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id).to(device)
processor = AutoProcessor.from_pretrained(model_id)

pipe_whisper = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    chunk_length_s=30,            # split into 30 s chunks before encoding
    stride_length_s=(5, 5),       # 5 s overlap on each side
    batch_size=8,                 # tune to your GPU/CPU
    generate_kwargs={"language":"arabic","task":"transcribe"},
    device=device
)


Device set to use cuda:0


In [None]:
def resample_audio(audio_array, orig_sr, target_sr=16000):
    if orig_sr != target_sr:
        return librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr)
    return audio_array

import numpy as np

# Ensure that each audio is a NumPy array, as the pipeline expects ndarray
audios = [np.array(ex["audio"]["array"]) for ex in combined]

# 6️⃣ Transcribe in one batch
results = pipe_whisper(audios)    # now all batches have consistent keys

predicted_texts = [r["text"] for r in results]
ground_truth   = combined["text"]

# 7️⃣ Compute WER
error_rate_whisper = wer(ground_truth, predicted_texts)
print(f"Whisper (large) Word Error Rate: {error_rate_whisper:.3f}")

In [9]:
cer = evaluate.load("cer")
cer_score_whisper = cer.compute(predictions=predicted_texts, references=ground_truth)
print(f"CER: {cer_score_whisper}")

CER: 0.4011987777168377


## Whisper Small-Ar

In [10]:
# Whisper Small-Ar
pipe_s = pipeline(
    "automatic-speech-recognition", model="ayoubkirouane/whisper-small-ar",
    chunk_length_s=30, stride_length_s=(5,5), batch_size=8,
    generate_kwargs={"language":"arabic","task":"transcribe"},
    device=device
)

config.json:   0%|          | 0.00/1.32k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/967M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/967M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/3.83k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/805 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/494k [00:00<?, ?B/s]

normalizer.json:   0%|          | 0.00/52.7k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/34.6k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.08k [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/339 [00:00<?, ?B/s]

Device set to use cuda:0


In [13]:
def resample_audio(audio_array, orig_sr, target_sr=16000):
    if orig_sr != target_sr:
        return librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr)
    return audio_array

import numpy as np

# Ensure that each audio is a NumPy array, as the pipeline expects ndarray
audios = [np.array(ex["audio"]["array"]) for ex in combined]

print("Running Whisper-Small-Ar...")
out_s = pipe_s(audios)
hyp_s = [x['text'] for x in out_s]


Running Whisper-Small-Ar...




In [14]:
cer = evaluate.load("cer")
cer_score_s = cer.compute(predictions=hyp_s, references=ground_truth)
print(f"CER: {cer_score_s}")

CER: 0.38420956410457313


## whisper-large-v3


In [15]:
from transformers import pipeline

pipe_whisper = pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-large-v3",
    device=device,
    chunk_length_s=30,
    stride_length_s=(5, 5)
)



config.json:   0%|          | 0.00/1.27k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/3.90k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/283k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.48M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/494k [00:00<?, ?B/s]

normalizer.json:   0%|          | 0.00/52.7k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/34.6k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.07k [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/340 [00:00<?, ?B/s]

Device set to use cuda:0


In [16]:
def resample_audio(audio_array, orig_sr, target_sr=16000):
    if orig_sr != target_sr:
        return librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr)
    return audio_array

import numpy as np

# Ensure that each audio is a NumPy array, as the pipeline expects ndarray
audios = [np.array(ex["audio"]["array"]) for ex in combined]

# Run inference
out_whisper = pipe_whisper(audios)
hyp_whisper = [x["text"] for x in out_whisper]


Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.


In [17]:
cer = evaluate.load("cer")
cer_score_whisper3 = cer.compute(predictions=hyp_whisper, references=ground_truth)
print(f"CER: {cer_score_whisper3}")

CER: 0.39661521585834053


## HuBERT Egyptian CTC

In [18]:
from transformers import HubertForCTC, Wav2Vec2Processor

# feature‐extractor / tokenizer stays the same
wv_proc  = Wav2Vec2Processor.from_pretrained("omarxadel/hubert-large-arabic-egyptian")

# load with HubertForCTC, not Wav2Vec2ForCTC
wv_model = HubertForCTC.from_pretrained("omarxadel/hubert-large-arabic-egyptian").to(device)


preprocessor_config.json:   0%|          | 0.00/256 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/257 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/418 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.84k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

In [21]:
def resample_audio(audio_array, orig_sr, target_sr=16000):
    if orig_sr != target_sr:
        return librosa.resample(audio_array, orig_sr=orig_sr, target_sr=target_sr)
    return audio_array

import numpy as np
import math

# Ensure that each audio is a NumPy array, as the pipeline expects ndarray
audios = [np.array(ex["audio"]["array"]) for ex in combined]

all_preds = []
BATCH = 16
audio_arrays = [np.array(ex["audio"]["array"]) for ex in combined]

for i in range(math.ceil(len(audio_arrays)/BATCH)):
    chunk = audio_arrays[i*BATCH:(i+1)*BATCH]
    wv_inp = wv_proc(chunk, sampling_rate=16000, return_tensors="pt", padding=True)
    wv_inp = {k: v.to(device) for k,v in wv_inp.items()}
    with torch.no_grad():
        logits = wv_model(wv_inp["input_values"], attention_mask=wv_inp["attention_mask"]).logits
    pred_ids = torch.argmax(logits, dim=-1)
    all_preds += wv_proc.batch_decode(pred_ids)
    print(f"Completed batch {i+1}/{math.ceil(len(audio_arrays)/BATCH)}")




OutOfMemoryError: CUDA out of memory. Tried to allocate 2.93 GiB. GPU 0 has a total capacity of 14.74 GiB of which 234.12 MiB is free. Process 408501 has 14.51 GiB memory in use. Of the allocated memory 13.82 GiB is allocated by PyTorch, and 576.16 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
cer = evaluate.load("cer")
cer_score_wv = cer.compute(predictions=all_preds, references=ground_truth)
print(f"CER: {cer_score_wv}")