# SETUP REQUIREMENTS

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install git+https://github.com/MahmoudAshraf97/ctc-forced-aligner.git
!pip install pydub datasets

Collecting git+https://github.com/MahmoudAshraf97/ctc-forced-aligner.git
  Cloning https://github.com/MahmoudAshraf97/ctc-forced-aligner.git to /tmp/pip-req-build-jtn8nnm8
  Running command git clone --filter=blob:none --quiet https://github.com/MahmoudAshraf97/ctc-forced-aligner.git /tmp/pip-req-build-jtn8nnm8
  Resolved https://github.com/MahmoudAshraf97/ctc-forced-aligner.git to commit 1f0a5f860d3d9daf3d94edb1c7d18f90d1702e5b
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting uroman (from ctc-forced-aligner==0.3.0)
  Downloading uroman-1.3.1.1-py3-none-any.whl.metadata (18 kB)
Collecting Unidecode (from ctc-forced-aligner==0.3.0)
  Downloading Unidecode-1.4.0-py3-none-any.whl.metadata (13 kB)
Downloading Unidecode-1.4.0-py3-none-any.whl (235 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.8/235.8 kB[0m [31m9.0 MB/s[0m eta [36m0:

In [None]:
import zipfile
import os

# Replace with your uploaded ZIP file path
zip_path = '/content/drive/MyDrive/Collab/TTS/data.zip'  # or use the actual uploaded filename
extract_to = '/content'  # change as needed

# Create extract directory if it doesn't exist
os.makedirs(extract_to, exist_ok=True)

# Extract the zip
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to)

print(f"Extracted to: {extract_to}")

Extracted to: /content


SENTENCE-LEVEL ALLIGN

In [None]:
import torch
from ctc_forced_aligner import (
    load_audio,
    load_alignment_model,
    generate_emissions,
    preprocess_text,
    get_alignments,
    get_spans,
    postprocess_results,
)
from pydub import AudioSegment
import os
import re
import csv

# Input folders
audio_folder = "/content/wav"
text_folder = "/content/txt"
output_root = "/content/splits/"

language = "nzi"  # ISO-639-3 Language code
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16

# Create output structure
os.makedirs(output_root, exist_ok=True)
wav_output_folder = os.path.join(output_root, "wav")
os.makedirs(wav_output_folder, exist_ok=True)

# Prepare metadata CSV
metadata_file = os.path.join(output_root, "metadata.csv")
metadata_rows = []

# Load alignment model and tokenizer once
alignment_model, alignment_tokenizer = load_alignment_model(
    device,
    dtype=torch.float16 if device == "cuda" else torch.float32,
)

def split_text_by_sentences(text):
    """
    Split text into sentences based on sentence-ending punctuation marks (., !, ?)
    Returns list of sentences with their character positions
    """
    sentence_endings = ['.', '!', '?']
    sentences = []
    current_sentence = ""
    start_pos = 0

    for i, char in enumerate(text):
        current_sentence += char

        if char in sentence_endings:
            # Clean up the sentence
            clean_sentence = current_sentence.strip()
            if clean_sentence:
                sentences.append({
                    'text': clean_sentence,
                    'start_char': start_pos,
                    'end_char': i + 1,
                    'is_sentence_end': True  # Always true since we're only splitting at sentence ends
                })

            # Start new sentence
            current_sentence = ""
            start_pos = i + 1

    # Handle remaining text if no punctuation at end
    if current_sentence.strip():
        sentences.append({
            'text': current_sentence.strip(),
            'start_char': start_pos,
            'end_char': len(text),
            'is_sentence_end': True
        })

    return sentences

def group_words_by_text_segments(word_timestamps, text_segments):
    """
    Group word timestamps according to pre-defined text segments
    """
    grouped_segments = []

    for seg_idx, text_segment in enumerate(text_segments):
        segment_words = []
        segment_text = text_segment['text']

        # Find words that belong to this text segment
        # We'll match by checking if words appear in the expected order within the segment
        segment_words_list = segment_text.split()
        word_idx_in_segment = 0

        for word_info in word_timestamps:
            if word_idx_in_segment >= len(segment_words_list):
                break

            word = word_info['text'].strip()
            expected_word = segment_words_list[word_idx_in_segment]

            # Simple matching - you might need to improve this based on your data
            if (word.lower() == expected_word.lower() or
                word.lower() in expected_word.lower() or
                expected_word.lower() in word.lower()):
                segment_words.append(word_info)
                word_idx_in_segment += 1

                # If we've found all words for this segment, break
                if word_idx_in_segment >= len(segment_words_list):
                    break

        if segment_words:
            # Calculate start and end times for the group
            group_start = segment_words[0]['start']
            group_end = segment_words[-1]['end']

            # Clean up the text for filename
            clean_text = re.sub(r'[^\w\s-]', '', segment_text)  # Remove punctuation
            clean_text = re.sub(r'\s+', '_', clean_text)  # Replace spaces with underscores
            clean_text = clean_text[:50]  # Limit length for filename

            grouped_segments.append({
                'start': group_start,
                'end': group_end,
                'text': clean_text,
                'word_count': len(segment_words),
                'original_text': segment_text,
                'is_sentence_end': text_segment['is_sentence_end']
            })

            # Remove processed words from the list
            for processed_word in segment_words:
                if processed_word in word_timestamps:
                    word_timestamps.remove(processed_word)

    return grouped_segments

def simple_word_grouping(word_timestamps, words_per_group=5):
    """
    Fallback method: simply group words by a fixed number
    """
    grouped_segments = []

    for i in range(0, len(word_timestamps), words_per_group):
        group = word_timestamps[i:i + words_per_group]

        if group:
            group_start = group[0]['start']
            group_end = group[-1]['end']

            # Create text from all words in group
            group_text = '_'.join([word['text'].strip() for word in group])
            group_text = re.sub(r'[^\w\s-]', '', group_text)[:50]

            original_text = ' '.join([word['text'].strip() for word in group])

            grouped_segments.append({
                'start': group_start,
                'end': group_end,
                'text': group_text,
                'word_count': len(group),
                'original_text': original_text,
                'is_sentence_end': False
            })

    return grouped_segments

# Global counter for unique filenames
global_segment_counter = 1

# Iterate over all wav files
for audio_filename in os.listdir(audio_folder):
    if not audio_filename.lower().endswith(".wav"):
        continue

    base_name = os.path.splitext(audio_filename)[0]

    audio_path = os.path.join(audio_folder, audio_filename)
    text_path = os.path.join(text_folder, base_name + ".txt")

    if not os.path.isfile(text_path):
        print(f"Warning: Text file not found for {audio_filename}, skipping...")
        continue

    print(f"Processing pair: {audio_filename} + {base_name}.txt")

    # Load audio waveform
    audio_waveform = load_audio(audio_path, alignment_model.dtype, alignment_model.device)

    # Read and preprocess text
    with open(text_path, "r", encoding="utf-8") as f:
        lines = f.readlines()
    original_text = "".join(line for line in lines).replace("\n", " ").strip()
    text = original_text

    print(f"Original text: {original_text[:100]}...")

    # Generate emissions (model output probabilities)
    emissions, stride = generate_emissions(
        alignment_model, audio_waveform, batch_size=batch_size
    )

    # Preprocess text
    tokens_starred, text_starred = preprocess_text(
        text,
        romanize=True,
        language=language,
    )

    # Get alignments between audio and text tokens
    segments, scores, blank_token = get_alignments(
        emissions,
        tokens_starred,
        alignment_tokenizer,
    )

    # Get spans (start/end indices) for each token
    spans = get_spans(tokens_starred, segments, blank_token)

    # Postprocess to get word-level timestamps
    word_timestamps = postprocess_results(text_starred, spans, stride, scores)

    print(f"Found {len(word_timestamps)} words")

    # Split text into sentences only
    text_segments = split_text_by_sentences(original_text)
    print(f"Text segments (sentences): {len(text_segments)}")

    if len(text_segments) > 1:
        # Use sentence-based grouping
        grouped_segments = group_words_by_text_segments(word_timestamps.copy(), text_segments)
        print(f"Grouped into {len(grouped_segments)} sentence segments")
    else:
        # Fallback to simple grouping
        print("No sentence endings found, using simple word grouping (5 words per segment)")
        grouped_segments = simple_word_grouping(word_timestamps, words_per_group=5)

    # Load original audio for splitting
    audio = AudioSegment.from_wav(audio_path)

    # Save each segment as separate audio file and add to metadata
    saved_count = 0
    for i, segment_info in enumerate(grouped_segments):
        start_ms = int(segment_info['start'] * 1000)
        end_ms = int(segment_info['end'] * 1000)

        # Check segment duration - skip if longer than 2 minutes (300 seconds)
        segment_duration = (end_ms - start_ms) / 1000
        if segment_duration > 700:
            print(f"Skipping segment {i+1}: Too long ({segment_duration:.1f}s > 300s)")
            continue

        segment_audio = audio[start_ms:end_ms]

        # Create unique filename using global counter
        segment_filename = f"segment_{global_segment_counter:06d}.wav"
        audio_file = os.path.join(wav_output_folder, segment_filename)

        # Export audio segment
        segment_audio.export(audio_file, format="wav")

        # Add to metadata
        metadata_rows.append({
            'filename': segment_filename,
            'text': segment_info['original_text']
        })

        saved_count += 1
        global_segment_counter += 1

        print(f"Saved segment {saved_count}: '{segment_info['original_text'][:50]}...' "
              f"[{segment_info['start']:.2f}s - {segment_info['end']:.2f}s] "
              f"({segment_duration:.1f}s, {segment_info['word_count']} words) -> {segment_filename}")

    print(f"✓ Completed {base_name}: Created {saved_count} segments (skipped {len(grouped_segments) - saved_count} segments > 2min)")
    print("-" * 50)

# Write metadata to CSV with pipe separator
print(f"Writing metadata to {metadata_file}...")
with open(metadata_file, 'w', newline='', encoding='utf-8') as csvfile:
    if metadata_rows:
        fieldnames = metadata_rows[0].keys()
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='|')

        writer.writeheader()
        writer.writerows(metadata_rows)

print(f"All done! Created {len(metadata_rows)} audio segments in {wav_output_folder}")
print(f"Metadata saved to {metadata_file} with {len(metadata_rows)} entries")

  m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token)
  m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token)
  elif re.match('(flt)p?( \(default\))?$', token):
  elif re.match('(dbl)p?( \(default\))?$', token):
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json:   0%|          | 0.00/286 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/74.0 [00:00<?, ?B/s]

Processing pair: MAT.20.wav + MAT.20.txt


  s = torchaudio.io.StreamReader(src, format, None, buffer_size)


Original text: MAT 20. Gyimayɛvoma mɔɔ wɔ vanye egyinli ne anu la. Gyisɛse vale zɔle zolɛ kɛ, Kɛ Anwuma Belemgbunli...
Found 817 words
Text segments (sentences): 48
Grouped into 48 sentence segments
Saved segment 1: 'MAT 20....' [0.72s - 0.90s] (0.2s, 2 words) -> segment_000001.wav
Saved segment 2: 'Gyimayɛvoma mɔɔ wɔ vanye egyinli ne anu la....' [0.94s - 2.60s] (1.7s, 8 words) -> segment_000002.wav
Saved segment 3: 'Gyisɛse vale zɔle zolɛ kɛ, Kɛ Anwuma Belemgbunlili...' [4.36s - 14.84s] (10.5s, 32 words) -> segment_000003.wav
Saved segment 4: 'Ɔ nee bɛ lile ɔ nloa kɛ kenle ko ɔkɛdua bɛ etaku n...' [15.82s - 21.22s] (5.4s, 20 words) -> segment_000004.wav
Saved segment 5: 'Ɔvindele bieko kɛyɛ dɔne ngɔnla mekɛ ne anu na ɔhɔ...' [21.92s - 28.64s] (6.7s, 21 words) -> segment_000005.wav
Saved segment 6: 'Na ɔzele bɛ kɛ, Bɛdabɛ noko bɛhɔ me vanye egyinli ...' [29.54s - 36.72s] (7.2s, 21 words) -> segment_000006.wav
Saved segment 7: 'Na bɛhɔle....' [37.44s - 37.94s] (0.5s, 2 words) -> segment

WORD-LEVEL ALLIGN

In [None]:
import torch
from ctc_forced_aligner import (
    load_audio,
    load_alignment_model,
    generate_emissions,
    preprocess_text,
    get_alignments,
    get_spans,
    postprocess_results,
)
from pydub import AudioSegment
import os

# Input folders
audio_folder = "/content/wav"
text_folder = "/content/txt"
output_root = "/content/splits/"

language = "twi"  # ISO-639-3 Language code
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16

os.makedirs(output_root, exist_ok=True)

# Load alignment model and tokenizer once
alignment_model, alignment_tokenizer = load_alignment_model(
    device,
    dtype=torch.float16 if device == "cuda" else torch.float32,
)

# Iterate over all wav files
for audio_filename in os.listdir(audio_folder):
    if not audio_filename.lower().endswith(".wav"):
        continue

    base_name = os.path.splitext(audio_filename)[0]
    output_folder = os.path.join(output_root, base_name)

    # Skip processing if output folder already exists
    if os.path.exists(output_folder):
        print(f"Output folder for '{base_name}' already exists, skipping...")
        continue

    audio_path = os.path.join(audio_folder, audio_filename)
    text_path = os.path.join(text_folder, base_name + ".txt")

    if not os.path.isfile(text_path):
        print(f"Warning: Text file not found for {audio_filename}, skipping...")
        continue

    print(f"Processing pair: {audio_filename} + {base_name}.txt")

    # Load audio waveform
    audio_waveform = load_audio(audio_path, alignment_model.dtype, alignment_model.device)

    # Read and preprocess text
    with open(text_path, "r") as f:
        lines = f.readlines()
    text = "".join(line for line in lines).replace("\n", " ").strip()

    # Generate emissions (model output probabilities)
    emissions, stride = generate_emissions(
        alignment_model, audio_waveform, batch_size=batch_size
    )

    # Preprocess text
    tokens_starred, text_starred = preprocess_text(
        text,
        romanize=True,
        language=language,
    )

    # Get alignments between audio and text tokens
    segments, scores, blank_token = get_alignments(
        emissions,
        tokens_starred,
        alignment_tokenizer,
    )

    # Get spans (start/end indices) for each token
    spans = get_spans(tokens_starred, segments, blank_token)

    # Postprocess to get word-level timestamps
    word_timestamps = postprocess_results(text_starred, spans, stride, scores)

    # Load original audio for splitting
    audio = AudioSegment.from_wav(audio_path)

    # Create output folder per file
    os.makedirs(output_folder, exist_ok=True)

    # Save each word segment as separate audio file
    for i, word_info in enumerate(word_timestamps):
        start_ms = int(word_info['start'] * 1000)
        end_ms = int(word_info['end'] * 1000)

        segment_audio = audio[start_ms:end_ms]

        word = word_info['text'].replace(" ", "_").replace("/", "_")

        out_file = os.path.join(output_folder, f"{i:03d}_{word}.wav")

        segment_audio.export(out_file, format="wav")
        print(f"Saved segment {i}: '{word}' [{word_info['start']:.2f}s - {word_info['end']:.2f}s] -> {out_file}")

print("All done!")


Processing pair: KMK6wnOP-Tmp069-OY8Qws.wav + KMK6wnOP-Tmp069-OY8Qws.txt
Saved segment 0: 'Transcription:' [0.86s - 1.20s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/000_Transcription:.wav
Saved segment 1: 'Saa' [1.30s - 1.39s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/001_Saa.wav
Saved segment 2: 'ɔkɔdeɛ' [1.68s - 1.89s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/002_ɔkɔdeɛ.wav
Saved segment 3: 'no' [1.93s - 1.95s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/003_no.wav
Saved segment 4: 'Translation:' [2.00s - 2.35s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/004_Translation:.wav
Saved segment 5: 'That' [2.39s - 2.46s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/005_That.wav
Saved segment 6: 'eagle' [2.50s - 3.09s] -> /content/splits/KMK6wnOP-Tmp069-OY8Qws/006_eagle.wav
Processing pair: JSVVb30H-Tmp027-6pPivl.wav + JSVVb30H-Tmp027-6pPivl.txt
Saved segment 0: 'Transcription:' [0.48s - 0.78s] -> /content/splits/JSVVb30H-Tmp027-6pPivl/000_Transcription:.wav
Saved segment 1: 'Na' [0.82s - 0.84s] 

KeyboardInterrupt: 