# Imports

In [None]:
!pip install arabic-reshaper
!pip install python-bidi
!pip install yt-dlp
!pip install pydub
!pip install english_to_arabic_transphonator
!pip install num2words
!pip install pytube
!pip install youtube_dl
!pip install ffmpeg
!pip install langdetect
!sudo apt-get install parallel

# Auto-genrated Sub import and proccess using playlist

Importing subtitles in a playlist

In [None]:
# Playlist URL
playlist_url="https://youtube.com/playlist?list=PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd&si=2CSSBl7takDvMVB2"

In [None]:
# Download all videos in the playlist and their Arabic subtitles
!yt-dlp --write-auto-sub --sub-lang ar --skip-download "$playlist_url" -o "/content/auto_generated_subtitles/%(title)s.%(ext)s"


[youtube:tab] Extracting URL: https://youtube.com/playlist?list=PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd&si=2CSSBl7takDvMVB2
[youtube:tab] PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd: Downloading webpage
[youtube:tab] PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd: Redownloading playlist API JSON with unavailable videos
[download] Downloading playlist: final dataset
[youtube:tab] PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd page 1: Downloading API JSON
[youtube:tab] PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd page 1: Downloading API JSON
[youtube:tab] PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd page 1: Downloading API JSON
[youtube:tab] PLAaDo_d_X-35PX49IAmRJoj_Vva-MlYOd page 1: Downloading API JSON
[youtube:tab] Playlist final dataset: Downloading 41 items of 41
[download] Downloading item [0;32m1[0m of [0;94m41[0m
[youtube] Extracting URL: https://www.youtube.com/watch?v=6qyi8NY2NDY
[youtube] 6qyi8NY2NDY: Downloading webpage
[youtube] 6qyi8NY2NDY: Downloading ios player API JSON
[youtube] 6qyi8NY2NDY: Downloading mweb player API JSON

Getting Audio files | may take a while

In [None]:
!yt-dlp -f bestaudio[ext=m4a] "$playlist_url" -o "audio/%(title)s.%(ext)s"


Cleaning the auto-genrated subtitles whole folder

In [None]:
import os
import re

def clean_webvtt(input_file, output_file):
    with open(input_file, 'r', encoding='utf-8') as infile:
        lines = infile.readlines()

    clean_lines = []
    prev_text = ""  # Track the previous text to avoid duplicates

    for line in lines:
        # If the line is a timestamp, add it to the clean output
        if '-->' in line:
            clean_lines.append(line)
        # Remove <c> tags and their contents from caption lines
        elif re.search(r'<[^>]+>', line):
            clean_line = re.sub(r'<[^>]+>', '', line)  # Remove any markup like <c> tags
            # Add the line if it's not the same as the previous caption text
            if clean_line.strip() != prev_text:
                clean_lines.append(clean_line)
                prev_text = clean_line.strip()
        # Keep the normal text lines (Arabic captions)
        elif line.strip():
            if line.strip() != prev_text:
                clean_lines.append(line)
                prev_text = line.strip()

    with open(output_file, 'w', encoding='utf-8') as outfile:
        outfile.writelines(clean_lines)

def read_vtt_file(file_path):
    """Read the content of a .vtt file."""
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

def clean_vtt_timestamps(input_text):
    # Split into lines
    lines = input_text.strip().split('\n')

    # Initialize output list with header
    output_lines = []
    i = 0

    # Add header lines
    while i < len(lines) and '-->' not in lines[i]:
        output_lines.append(lines[i])
        i += 1

    # Process the rest of the content
    while i < len(lines):
        current_line = lines[i]
        current_line = current_line.replace(" align:start position:100%", "")
        # If this is a timestamp line
        if '-->' in current_line:
            next_line = lines[i + 1] if i + 1 < len(lines) else ""

            # Only keep timestamp if next line has text (not another timestamp)
            if i + 1 < len(lines) and '-->' not in next_line and next_line.strip():
                output_lines.append(current_line)
                output_lines.append(next_line)
                output_lines.append("")  # Add a blank line between blocks

        i += 1

    return '\n'.join(output_lines)


def save_as_vtt(file_name, content):
    """Save the cleaned content to a .vtt file."""
    with open(file_name, 'w', encoding='utf-8') as file:
        file.write(content)
    print(f"File saved as {file_name}")

def process_vtt_file(input_file_path, output_file_path):
    """Read, clean, and save a .vtt file."""
    # Read the unclean file
    clean_webvtt(input_file_path,output_file_path)

    unclean_text = read_vtt_file(output_file_path)
    # Clean the content
    cleaned_content = clean_vtt_timestamps(unclean_text)

    # Save the cleaned content to a new file
    save_as_vtt(output_file_path, cleaned_content)

def process_all_subtitles(input_folder, output_folder):
    """Process all .vtt files in the input folder and save them to the output folder."""

    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # List all files in the input folder
    for filename in os.listdir(input_folder):
        if filename.endswith(".vtt"):
            input_file_path = os.path.join(input_folder, filename)
            output_file_path = os.path.join(output_folder, filename)

            # Clean and save the file
            process_vtt_file(input_file_path, output_file_path)
# Example usage
input_folder = '/content/auto_generated_subtitles'  # Replace with the folder containing your .vtt files
output_folder = '/content/cleaned_subtitles'  # Replace with the folder where you want to save cleaned files

process_all_subtitles(input_folder, output_folder)


# Converting the subtitles and audio into JLspeech dataset

In [None]:
import os
import re
import csv
from pydub import AudioSegment
from datetime import timedelta
from num2words import num2words
from english_to_arabic_transphonator.transphonator import Transphonator
from langdetect import detect
from concurrent.futures import ProcessPoolExecutor

# Reusing the Transphonator object
trans = Transphonator()

# Function to remove diacritics (التشكيل) from Arabic text
def remove_diacritics(text):
    if isinstance(text, str):
        return re.sub(r'[\u064B-\u0652\u0670\u0640]', '', text)
    else:
        return text

# Function to convert timestamp to milliseconds
def timestamp_to_ms(timestamp):
    time_obj = timedelta(hours=int(timestamp.split(":")[0]),
                         minutes=int(timestamp.split(":")[1]),
                         seconds=float(timestamp.split(":")[2].replace(",", ".")))
    return int(time_obj.total_seconds() * 1000)

# Function to convert numbers to Arabic words
def convert_numbers_to_arabic_words(text):
    text_with_arabic_words = re.sub(r'\d+', lambda x: num2words(x.group(), lang='ar'), text)
    return text_with_arabic_words

# Function to transliterate only English words to Arabic and keep Arabic words unchanged
def transliterate_english_to_arabic(text):
    words = text.split()
    transliterated_words = []

    for word in words:
        if re.match(r'^[a-zA-Z]+$', word):  # If it's an English word
            transliterated_word = trans.transphonate_english_word(word)
            transliterated_word = remove_diacritics(transliterated_word)
            transliterated_words.append(transliterated_word or word)
        else:
            match = re.match(r'^(.*?)([a-zA-Z]+)(.*?)$', word)
            if match:
                arabic_part, english_part, punctuation = match.groups()
                transliterated_word = trans.transphonate_english_word(english_part)
                transliterated_word = remove_diacritics(transliterated_word)
                transliterated_words.append(arabic_part + (transliterated_word or english_part) + punctuation)
            else:
                transliterated_words.append(word)

    return ' '.join(transliterated_words)

# Function to normalize Arabic text
def normalize_arabic_text(text):
    text = " ".join(text.split())
    text = remove_diacritics(text)
    text = convert_numbers_to_arabic_words(text)
    text = transliterate_english_to_arabic(text)
    return text

# Function to parse .vtt file and return start and end timestamps with text
def parse_vtt_subtitles(vtt_file):
    with open(vtt_file, "r", encoding="utf-8") as f:
        subtitles = f.read()

    pattern = re.compile(r"(\d{2}:\d{2}:\d{2}[\.,]\d{3}) --> (\d{2}:\d{2}:\d{2}[\.,]\d{3})\n(.*?)\n", re.DOTALL)
    segments = []

    for match in pattern.finditer(subtitles):
        start_time = match.group(1)
        end_time = match.group(2)
        text = match.group(3).strip().replace("\n", " ")
        normalized_text = normalize_arabic_text(text)
        segments.append({
            "start": timestamp_to_ms(start_time),
            "end": timestamp_to_ms(end_time),
            "text": text,
            "normalized_text": normalized_text
        })

    return segments

audio_count = 1
# Function to process a single file (audio and subtitle pair)
def process_single_file(audio_file, vtt_file, output_dir):
    global audio_count
    audio = AudioSegment.from_file(audio_file)
    segments = parse_vtt_subtitles(vtt_file)
    with open(f"{output_dir}/metadata.csv", "a", encoding="utf-8-sig", newline="") as metadata_file:
        csv_writer = csv.writer(metadata_file, delimiter='|')  # Set delimiter to '|'
        for segment in segments:
            start_time = segment["start"]
            end_time = segment["end"]
            original_start_time = segment["start"]

            original_end_time = segment["end"]

            text = segment["text"]
            normalized_text = segment["normalized_text"]

            start_time = max(0, original_start_time - 100)  # Subtract 0.3 seconds, ensure >= 0
            end_time = min(len(audio), original_end_time + 200)

            # Uncomment conditions if required
            if re.search(r'\[.*?\]', text) or (end_time - start_time) > 12000:
                 continue

            audio_segment = audio[start_time:end_time]
            audio_filename = f"audio_{audio_count}"  # Updated naming format
            audio_count += 1  # Increment audio number for this video

            try:
              audio_segment.export(f"{output_dir}/wavs/{audio_filename}.wav", format="wav")
              csv_writer.writerow([audio_filename, text, normalized_text])
            except Exception as e:
              print(f"Error exporting {audio_filename}: {e}")

# Main function to process multiple files in parallel
def process_files(audio_files, vtt_files, output_dir):
    print(audio_files,vtt_files)
    os.makedirs(f"{output_dir}/wavs", exist_ok=True)

    with open(f"{output_dir}/metadata.csv", "w", encoding="utf-8-sig", newline="") as metadata_file:
        csv_writer = csv.writer(metadata_file)
        csv_writer.writerow(["Filename|Original Text|Normalized Text"])

    #with ProcessPoolExecutor() as executor:
        for audio_file, vtt_file in zip(audio_files, vtt_files):
            #executor.submit(process_single_file, audio_file, vtt_file, output_dir)
            process_single_file(audio_file, vtt_file, output_dir)

# Example usage
if __name__ == "__main__":
    # Get list of all .vtt and .mp3 files
    audio_dir = "/content/audio/"
    vtt_dir = "/content/cleaned_subtitles/"

    # List all audio and subtitle files
    audio_files = [os.path.join(audio_dir, f) for f in os.listdir(audio_dir)] #if f.endswith(".m4a")]
    vtt_files = [os.path.join(vtt_dir, f) for f in os.listdir(vtt_dir)]# if f.endswith(".ar.vtt")]
    # Ensure matching files (same name for audio and subtitle)
    #audio_files = [f for f in audio_files if os.path.basename(f).replace(".m4a", ".ar.vtt") in os.listdir(vtt_dir)]
    # Ensure matching files (same name for audio and subtitle)
    #audio_files = [f for f in audio_files if os.path.basename(f).replace(".m4a", ".ar.vtt") in os.listdir(vtt_dir)]
    def match_files(audio_files, vtt_files):
    # Extract base names from the files (without paths and extensions)
      audio_base_names = [os.path.splitext(os.path.basename(file))[0] for file in audio_files]
      vtt_base_names = [os.path.splitext(os.path.basename(file))[0] for file in vtt_files]

    # Sort the files based on the base names to align them
      sorted_audio_files = [audio_files[audio_base_names.index(name)] for name in sorted(audio_base_names)]
      sorted_vtt_files = [vtt_files[vtt_base_names.index(name)] for name in sorted(vtt_base_names)]

      return sorted_audio_files, sorted_vtt_files
    sorted_audio_files, sorted_vtt_files = match_files(audio_files, vtt_files)
    output_dir = "/content/txt_to_speech_dataset"
    process_files(sorted_audio_files, sorted_vtt_files, output_dir)

In [None]:
import shutil

# Path of the directory to zip
directory_path = "/content/txt_to_speech_dataset"

# Path where the zip file will be saved
output_zip_path = "/content/txt_to_speech_dataset.zip"

# Create a zip file
shutil.make_archive(base_name=output_zip_path.replace('.zip', ''), format='zip', root_dir=directory_path)

print(f"Zipped directory saved to: {output_zip_path}")


Zipped directory saved to: /content/txt_to_speech_dataset.zip


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# prompt: save this file in google drive
from google.colab import drive
drive.mount('/content/drive')

drive.mount("/content/drive", force_remount=True)
!mkdir -p /content/drive/MyDrive/dataset/
#!cp -r /content/txt_to_speech_dataset /content/drive/MyDrive/txt_to_speech_dataset_files
!cp /content/txt_to_speech_dataset.zip /content/drive/MyDrive/dataset/