# Convert to MP3

In [None]:
import yt_dlp
import os
import subprocess

# Step 0: Create folders for organization
os.makedirs("data/raw_audio", exist_ok=True)
print("✅ Created folder: data/raw_audio")

# Step 1: Get YouTube URL
youtube_url = input("Enter YouTube URL: ").strip()

# Step 2: Define output MP3 path
output_mp3 = os.path.join("data", "raw_audio", "full_audio.mp3")

# Step 3: Set yt_dlp options to force mp3 conversion
ydl_opts = {
    'format': 'bestaudio/best',
    'outtmpl': 'data/raw_audio/temp_audio.%(ext)s',  # Download to raw_audio folder
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'mp3',
        'preferredquality': '192',
    }],
    'quiet': False,
}

print("📥 Downloading and converting...")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
    ydl.download([youtube_url])

# Step 4: Rename to final name if conversion succeeded
temp_mp3 = os.path.join("data", "raw_audio", "temp_audio.mp3")
if os.path.exists(temp_mp3):
    os.rename(temp_mp3, output_mp3)
    print(f"✅ Saved audio as: {output_mp3}")
else:
    print("❌ MP3 not found. Check if ffmpeg is installed and accessible.")


# Chunking

In [1]:
import os
import subprocess
import glob

# Create chunks folder
os.makedirs("data/chunks", exist_ok=True)

# Path to full audio
input_audio = "data/raw_audio/full_audio.mp3"

# Ask user for chunk length in seconds
while True:
    try:
        chunk_length = int(input("\n\nEnter chunk length in seconds (e.g., 10): ").strip())
        if chunk_length <= 0:
            print("Please enter a positive integer.")
            continue
        break
    except ValueError:
        print("Invalid input. Please enter an integer.")

# Output pattern for chunk files
output_pattern = "data/chunks/chunk_%03d.wav"

# Run ffmpeg to chunk audio with output suppressed
print(f"Chunking full audio into {chunk_length}-second WAV files...")
subprocess.run([
    "ffmpeg",
    "-hide_banner",
    "-loglevel", "error",
    "-i", input_audio,
    "-ar", "16000",
    "-ac", "1",
    "-f", "segment",
    "-segment_time", str(chunk_length),
    "-reset_timestamps", "1",
    output_pattern
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

# List created chunks
chunk_files = sorted(glob.glob("data/chunks/chunk_*.wav"))
print(f"✅ Created {len(chunk_files)} chunks.")


Chunking full audio into 3-second WAV files...
✅ Created 2535 chunks.


# Transcipting 

In [None]:
import os
import glob
import re
import time
import torch
from transformers import pipeline

# ========== [CONFIG] ==========
CHUNK_FOLDER = "data/chunks"
TRANSCRIPT_FOLDER = "data/transcriptions"
MODEL_NAME = "Vira21/Whisper-Small-Khmer"
# ==============================

# Extract chunk number for sorting
def extract_number(filename):
    match = re.search(r'chunk_(\d+)\.wav', os.path.basename(filename))
    return int(match.group(1)) if match else -1

# Select device
device = 0 if torch.cuda.is_available() else -1
print(f"[INFO] Using device: {'GPU' if device == 0 else 'CPU'}")

# Load Whisper model
print("[INFO] Loading ASR model...")
asr = pipeline("automatic-speech-recognition", model=MODEL_NAME, device=device)
print("[INFO] Model loaded successfully.\n")

# Prepare folders
os.makedirs(TRANSCRIPT_FOLDER, exist_ok=True)

# Load and sort chunk files
audio_files = glob.glob(os.path.join(CHUNK_FOLDER, "chunk_*.wav"))
audio_files.sort(key=extract_number)

if not audio_files:
    print(f"[ERROR] No audio chunk files found in {CHUNK_FOLDER}.")
    exit(1)

print(f"[INFO] Found {len(audio_files)} chunk files.\n")

# ========== [TRANSCRIPTION LOOP] ==========
for audio_file in audio_files:
    base_name = os.path.splitext(os.path.basename(audio_file))[0]
    output_file = os.path.join(TRANSCRIPT_FOLDER, base_name + ".txt")

    if os.path.exists(output_file):
        print(f"[SKIP] Transcription exists for {audio_file}.")
        continue

    print(f"[TRANSCRIBING] {audio_file} ...")
    start_time = time.time()

    try:
        result = asr(audio_file)
        transcription = result['text']
    except Exception as e:
        print(f"[ERROR] Failed to transcribe {audio_file}: {e}")
        continue

    with open(output_file, "w", encoding="utf-8") as f:
        f.write(transcription)

    duration = time.time() - start_time
    print(f"[SAVED] {output_file} ({duration:.2f} sec)\n")

print("[✅ DONE] All audio files transcribed.")


# Filtering for code - switch

In [4]:
import os
import re
import glob

# ========== [CONFIG] ==========
TRANSCRIPT_FOLDER = "data/transcriptions"
CHUNK_FOLDER = "data/chunks"
# ==============================

# Check for Khmer + English (code-switching)
def contains_khmer_and_english(text):
    has_khmer = re.search(r'[\u1780-\u17FF]', text)
    has_english = re.search(r'[A-Za-z]', text)
    return has_khmer and has_english

# Extract number for sorting
def extract_number(filename):
    match = re.search(r'chunk_(\d+)\.txt', os.path.basename(filename))
    return int(match.group(1)) if match else -1

# Load all transcript files
txt_files = sorted(glob.glob(os.path.join(TRANSCRIPT_FOLDER, "chunk_*.txt")), key=extract_number)

if not txt_files:
    print("[ERROR] No transcript files found.")
    exit(1)

print(f"[INFO] Found {len(txt_files)} transcript files to filter.")

removed = 0
kept = 0

for txt_file in txt_files:
    with open(txt_file, "r", encoding="utf-8") as f:
        content = f.read()

    if contains_khmer_and_english(content):
        kept += 1
    else:
        # Remove .txt
        os.remove(txt_file)

        # Remove corresponding .wav
        audio_file = os.path.join(CHUNK_FOLDER, os.path.basename(txt_file).replace(".txt", ".wav"))
        if os.path.exists(audio_file):
            os.remove(audio_file)

        removed += 1

print(f"\n[FINAL REPORT] Kept: {kept}, Removed: {removed}")


[INFO] Found 557 transcript files to filter.

[FINAL REPORT] Kept: 202, Removed: 355


# Filtering for Special character and Num to word

In [5]:
import os
import re
import glob
import string
from num2words import num2words  # pip install num2words

transcript_folder = "data/transcriptions"

def clean_transcription(text):
    # Replace all numbers with words
    def replace_number(match):
        try:
            return num2words(int(match.group()), lang='en')
        except:
            return match.group()  # leave unchanged if conversion fails

    text = re.sub(r'\d+', replace_number, text)

    # Allowed characters: English letters, Khmer range, spaces, and some optional Khmer punctuation
    allowed_chars = string.ascii_letters + " " + "។៕"  # You can add more Khmer punctuation if needed

    cleaned_text = ''.join(
        ch for ch in text
        if (('\u1780' <= ch <= '\u17FF') or (ch in allowed_chars) or ch.isspace())
    )

    # Normalize multiple spaces
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()

    return cleaned_text

# Process each transcript
txt_files = glob.glob(os.path.join(transcript_folder, "chunk_*.txt"))

for txt_file in txt_files:
    with open(txt_file, "r", encoding="utf-8") as f:
        content = f.read()

    cleaned = clean_transcription(content)

    with open(txt_file, "w", encoding="utf-8") as f:
        f.write(cleaned)

print(f"✅ Cleaned {len(txt_files)} transcripts (special chars removed, numbers converted).")


✅ Cleaned 202 transcripts (special chars removed, numbers converted).


# tag_language_segments

In [None]:
import os
import re
from num2words import num2words

# Convert digits to words (e.g., 123 → "one hundred twenty three")
def convert_numbers_to_words(text):
    def replace_num(match):
        return num2words(int(match.group()))
    return re.sub(r'\d+', replace_num, text)

# Remove punctuation, emojis, symbols — keep only Khmer, English, spaces
def clean_text(text):
    text = convert_numbers_to_words(text)
    return re.sub(r'[^A-Za-z\u1780-\u17FF\s]', '', text)

# Add <km> or <en> tags per segment
def tag_language_segments(text):
    words = text.split()
    tagged_segments = []

    current_lang = None
    segment_words = []

    def flush_segment():
        if not segment_words:
            return
        prefix = '<km>' if current_lang == 'km' else '<en>'
        tagged_segments.append(prefix + ' ' + ' '.join(segment_words))

    for word in words:
        if re.search(r'[\u1780-\u17FF]', word):
            lang = 'km'
        elif re.search(r'[A-Za-z]', word):
            lang = 'en'
        else:
            lang = 'en'  # fallback

        if lang != current_lang:
            flush_segment()
            segment_words = [word]
            current_lang = lang
        else:
            segment_words.append(word)

    flush_segment()
    return ' '.join(tagged_segments)

# Clean *.txt → *.cleaned.txt, then remove the original
def process_cleaning(folder):
    print("=== Cleaning raw transcript files ===")
    for filename in os.listdir(folder):
        if filename.endswith(".txt") and not filename.endswith(".cleaned.txt") and not filename.endswith(".tagged.txt"):
            original_path = os.path.join(folder, filename)
            with open(original_path, "r", encoding="utf-8") as f:
                text = f.read()

            cleaned = clean_text(text)

            cleaned_name = filename.replace(".txt", ".cleaned.txt")
            cleaned_path = os.path.join(folder, cleaned_name)
            with open(cleaned_path, "w", encoding="utf-8") as f:
                f.write(cleaned)

            os.remove(original_path)
            print(f"[CLEANED] {filename} → {cleaned_name} (deleted original)")

# Tag *.cleaned.txt → *.tagged.txt, then remove the cleaned
def process_tagging_and_cleanup(folder):
    print("\n=== Tagging cleaned files and removing them ===")
    for filename in os.listdir(folder):
        if filename.endswith(".cleaned.txt"):
            cleaned_path = os.path.join(folder, filename)
            with open(cleaned_path, "r", encoding="utf-8") as f:
                cleaned = f.read()

            tagged = tag_language_segments(cleaned)
            tagged_name = filename.replace(".cleaned.txt", ".tagged.txt")
            tagged_path = os.path.join(folder, tagged_name)
            with open(tagged_path, "w", encoding="utf-8") as f:
                f.write(tagged)

            os.remove(cleaned_path)
            print(f"[TAGGED] {filename} → {tagged_name} (deleted cleaned)")

def main():
    folder = os.path.join(os.getcwd(), "data", "transcriptions")
    if not os.path.isdir(folder):
        print(f"[ERROR] Folder not found: {folder}")
        return

    process_cleaning(folder)
    process_tagging_and_cleanup(folder)
    print("\n✅ Done tagging all transcriptions.")

if __name__ == "__main__":
    main()
