<a href="https://colab.research.google.com/github/HamzaAlSamman/arabic-video-dubbing/blob/main/AI_Dubbing_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### ✅ Before you start: Create a `.env` file for API keys (Required)

This notebook reads API keys from a `.env` file instead of hardcoding them in code (safer + easier to manage).

#### 1) Create a file named `.env`
- **Google Colab path:** `/content/.env`  
  (If you prefer Google Drive, save it there and update the path in Step 3.0.)

#### 2) Paste your keys inside the file like this
```txt
GROQ_API_KEY=YOUR_GROQ_KEY_HERE
GEMINI_API_KEY=YOUR_GEMINI_KEY_HERE
ELEVEN_API_KEY=YOUR_ELEVEN_KEY_HERE


In [1]:
#@title 0) Setup Project Folders + (Optional) Mount Drive { display-mode: "form" }

use_google_drive = True #@param {type:"boolean"}
drive_base_path = "/content/gdrive/MyDrive" #@param {type:"string"}  # Change if using Shared Drive
drive_upload_folder = "dub_project" #@param {type:"string"}          # Can be nested like: Courses/MyCourse/dub_project

import os
from pathlib import Path

# Optional: mount Google Drive
if use_google_drive:
    from google.colab import drive
    drive.mount("/content/gdrive", force_remount=True)

# Project folders (local Colab runtime)
PROJECT_ROOT = Path("/content/dub_project")
DIRS = {
    "input": PROJECT_ROOT / "input",
    "work": PROJECT_ROOT / "work",
    "extract": PROJECT_ROOT / "work" / "00_extract",
    "output": PROJECT_ROOT / "output",
    "tmp": PROJECT_ROOT / "tmp",
    "logs": PROJECT_ROOT / "logs",
}
for p in DIRS.values():
    p.mkdir(parents=True, exist_ok=True)

drive_upload_path = None
if use_google_drive:
    drive_upload_path = Path(drive_base_path) / drive_upload_folder
    drive_upload_path.mkdir(parents=True, exist_ok=True)
    print("Drive folder ready:", drive_upload_path)

print("\nLocal project folders ready:")
for k, v in DIRS.items():
    print(f"- {k}: {v}")

# Check ffmpeg
import subprocess
res = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True)
print("\nFFmpeg:", res.stdout.splitlines()[0] if res.returncode == 0 else "Not found")


Mounted at /content/gdrive
Drive folder ready: /content/gdrive/MyDrive/dub_project

Local project folders ready:
- input: /content/dub_project/input
- work: /content/dub_project/work
- extract: /content/dub_project/work/00_extract
- output: /content/dub_project/output
- tmp: /content/dub_project/tmp
- logs: /content/dub_project/logs

FFmpeg: ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers


In [1]:
#@title 1) Select video source + pick file_id { display-mode: "form" }

choose = "upload_now" #@param ["already_uploaded","upload_now","custom_path"]
custom_video_folder = "" #@param {type:"string"}  # Full path if choose=custom_path (Drive or /content path)

video_extensions = (".mp4", ".mov", ".mkv", ".avi", ".webm")

import os
import pandas as pd
from pathlib import Path

# Pull from globals (in case user re-runs cells out of order)
PROJECT_ROOT = globals().get("PROJECT_ROOT", Path("/content/dub_project"))
DIRS = globals().get("DIRS", {"input": PROJECT_ROOT / "input"})
use_google_drive = globals().get("use_google_drive", False)
drive_base_path = globals().get("drive_base_path", "/content/gdrive/MyDrive")
drive_upload_folder = globals().get("drive_upload_folder", "dub_project")

# Resolve video_folder
if choose == "upload_now":
    from google.colab import files
    uploaded = files.upload()

    input_dir = Path(DIRS["input"])
    input_dir.mkdir(parents=True, exist_ok=True)

    for fn in uploaded.keys():
        src = Path("/content") / fn
        dst = input_dir / fn
        os.replace(str(src), str(dst))

    video_folder = str(input_dir)

elif choose == "custom_path":
    video_folder = (custom_video_folder or "").strip()
    if not video_folder:
        video_folder = str(DIRS["input"])

else:
    # already_uploaded
    # If Drive is mounted and base exists -> use drive folder, otherwise fallback to local input
    if use_google_drive and Path(drive_base_path).exists():
        video_folder = str(Path(drive_base_path) / drive_upload_folder)
    else:
        video_folder = str(DIRS["input"])

# Ensure folder exists
Path(video_folder).mkdir(parents=True, exist_ok=True)

# List files
ids, names = [], []
id_monitor = {}
video_id = 1

for f in sorted(os.listdir(video_folder)):
    if f.lower().endswith(video_extensions):
        ids.append(video_id)
        names.append(f)
        id_monitor[video_id] = f
        video_id += 1

df = pd.DataFrame({"file_name": names, "file_id": ids}).set_index("file_id")

print("Folder:", video_folder)
if len(df):
    print("\nAvailable videos:")
    print(df)




KeyboardInterrupt: 

In [4]:
#@title 1.1) Enter input File ID { display-mode: "form" }

file_id = 3 #@param {type:"number"}

import os

selected_name = id_monitor.get(int(file_id), None)

if selected_name is None:
    raise ValueError("Invalid file_id. Choose a valid ID from the table above.")

INPUT_VIDEO = os.path.join(video_folder, selected_name)
print("Selected video:", INPUT_VIDEO)


Selected video: /content/gdrive/MyDrive/dub_project/video.mp4


In [5]:
#@title 1.2) Separate audio from video using FFmpeg { display-mode: "form" }

import subprocess
from pathlib import Path

def run_cmd(cmd):
    print("Running:\n", " ".join(map(str, cmd)))
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        print("STDERR (tail):\n", (p.stderr or "")[-2000:])
        raise RuntimeError("Command failed.")
    return p

def has_audio_stream(video_path: Path) -> bool:
    # Use ffprobe to detect audio stream
    cmd = [
        "ffprobe", "-v", "error",
        "-select_streams", "a:0",
        "-show_entries", "stream=codec_type",
        "-of", "default=nw=1:nk=1",
        str(video_path)
    ]
    p = subprocess.run(cmd, capture_output=True, text=True)
    return (p.returncode == 0) and ("audio" in (p.stdout or "").strip().lower())

def separate_audio_video(input_video_path: str, out_dir: str):
    input_video_path = Path(input_video_path)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    if not input_video_path.exists():
        raise FileNotFoundError(f"Input video not found: {input_video_path}")

    base = input_video_path.stem
    video_no_audio = out_dir / f"{base}_no_audio.mp4"
    audio_raw_wav  = out_dir / f"{base}_raw.wav"
    audio_16k_mono = out_dir / f"{base}_16k_mono.wav"

    # Video without audio
    run_cmd([
        "ffmpeg", "-y",
        "-i", str(input_video_path),
        "-map", "0:v:0",
        "-c:v", "copy",
        "-an",
        str(video_no_audio)
    ])

    # Audio extraction (only if audio exists)
    if not has_audio_stream(input_video_path):
        raise RuntimeError("No audio stream found in this video. Cannot continue ASR/dubbing pipeline.")

    # Extract raw audio (stereo 44.1k wav)
    run_cmd([
        "ffmpeg", "-y",
        "-i", str(input_video_path),
        "-map", "0:a:0",
        "-vn",
        "-acodec", "pcm_s16le",
        "-ar", "44100",
        "-ac", "2",
        str(audio_raw_wav)
    ])

    # ASR-friendly 16k mono wav
    run_cmd([
        "ffmpeg", "-y",
        "-i", str(audio_raw_wav),
        "-acodec", "pcm_s16le",
        "-ar", "16000",
        "-ac", "1",
        str(audio_16k_mono)
    ])

    return {
        "video_no_audio": str(video_no_audio),
        "audio_raw_wav": str(audio_raw_wav),
        "audio_16k_mono": str(audio_16k_mono),
    }

# Require Step 0 + Step 1
try:
    DIRS
except NameError:
    raise NameError("DIRS not found. Run Step 0 first.")
if "INPUT_VIDEO" not in globals() or not INPUT_VIDEO:
    raise NameError("INPUT_VIDEO not found. Run Step 1 first.")

outputs = separate_audio_video(INPUT_VIDEO, str(DIRS["extract"]))

# Save stable paths for later steps
step1_paths = dict(outputs)
step1_paths["input_video"] = str(INPUT_VIDEO)
step1_paths["extract_dir"] = str(DIRS["extract"])

print("\nOutputs (step1_paths):")
for k, v in step1_paths.items():
    print(f"- {k}: {v}")


Running:
 ffmpeg -y -i /content/gdrive/MyDrive/dub_project/video.mp4 -map 0:v:0 -c:v copy -an /content/dub_project/work/00_extract/video_no_audio.mp4
Running:
 ffmpeg -y -i /content/gdrive/MyDrive/dub_project/video.mp4 -map 0:a:0 -vn -acodec pcm_s16le -ar 44100 -ac 2 /content/dub_project/work/00_extract/video_raw.wav
Running:
 ffmpeg -y -i /content/dub_project/work/00_extract/video_raw.wav -acodec pcm_s16le -ar 16000 -ac 1 /content/dub_project/work/00_extract/video_16k_mono.wav

Outputs (step1_paths):
- video_no_audio: /content/dub_project/work/00_extract/video_no_audio.mp4
- audio_raw_wav: /content/dub_project/work/00_extract/video_raw.wav
- audio_16k_mono: /content/dub_project/work/00_extract/video_16k_mono.wav
- input_video: /content/gdrive/MyDrive/dub_project/video.mp4
- extract_dir: /content/dub_project/work/00_extract


In [6]:
#@title 2.0) Install Demucs { display-mode: "form" }

!pip -q install -U demucs


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m28.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.1/87.1 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.6/59.6 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m249.1/249.1 kB[0m [31m22.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.0/40.0 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.0/76.0 kB[0m [31m7.7 MB/s[0m eta [36m0

In [7]:
#@title Fix torchaudio save error (torchcodec) { display-mode: "form" }

!pip -q install torchcodec


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━[0m [32m1.8/2.1 MB[0m [31m53.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m36.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [8]:
#@title 2.1) Demucs (robust): vocals only + 16k mono + save paths { display-mode: "form" }

demucs_model = "htdemucs" #@param ["htdemucs","htdemucs_ft","mdx_extra","mdx_extra_q","mdx"]
device = "cuda" #@param ["cuda","cpu"]
segments_try = "4,2,1" #@param {type:"string"}
use_two_stems_vocals = True #@param {type:"boolean"}
shifts = 0 #@param {type:"integer"}
overlap = 0.25 #@param {type:"number"}

import os, sys, subprocess
from pathlib import Path
import torch

# Optional dependency for *_q models
if demucs_model.endswith("_q"):
    subprocess.run([sys.executable, "-m", "pip", "install", "-q", "diffq"], check=False)

def run_cmd(cmd):
    print("Running:\n", " ".join(map(str, cmd)))
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        print("\n--- STDOUT (tail) ---\n", (p.stdout or "")[-2000:])
        print("\n--- STDERR (tail) ---\n", (p.stderr or "")[-2000:])
        raise RuntimeError(f"Command failed (code={p.returncode}).")
    if p.stderr:
        print((p.stderr)[-600:])
    return p

# Require Step 0
if "DIRS" not in globals():
    raise NameError("DIRS not found. Run Step 0 first.")

# Require Step 1
if "INPUT_VIDEO" not in globals() or not INPUT_VIDEO:
    raise NameError("INPUT_VIDEO not found. Run Step 1 first.")

input_video_path = Path(INPUT_VIDEO)
base = input_video_path.stem

# Resolve audio_raw from step1_paths first
audio_raw = None
if "step1_paths" in globals() and isinstance(step1_paths, dict):
    audio_raw = Path(step1_paths.get("audio_raw_wav", ""))
elif "outputs" in globals() and isinstance(outputs, dict):
    audio_raw = Path(outputs.get("audio_raw_wav", ""))

# Fallback: search in extract dir
if (audio_raw is None) or (not audio_raw.exists()):
    candidates = sorted(Path(DIRS["extract"]).glob("*.wav"))
    # Prefer *_raw.wav if exists
    raw_candidates = [p for p in candidates if p.name.endswith("_raw.wav")]
    audio_raw = raw_candidates[-1] if raw_candidates else (candidates[-1] if candidates else None)

if audio_raw is None or not audio_raw.exists():
    raise FileNotFoundError("Could not find extracted RAW audio. Run Step 1.2 first.")

print("Using extracted audio RAW:", audio_raw)

# Output folders
demucs_out_root = Path(DIRS["work"]) / "01_demucs"
prep_root       = Path(DIRS["work"]) / "02_prep"
demucs_out_root.mkdir(parents=True, exist_ok=True)
prep_root.mkdir(parents=True, exist_ok=True)

# Parse segments list
seg_list = []
for x in segments_try.split(","):
    x = x.strip()
    if x:
        seg_list.append(int(x))
if not seg_list:
    seg_list = [4,2,1]

def try_demucs(run_device: str):
    for seg in seg_list:
        cmd = [sys.executable, "-m", "demucs",
               "-n", demucs_model,
               "-d", run_device,
               "--jobs", "0",
               "--segment", str(seg),
               "--shifts", str(shifts),
               "--overlap", str(overlap),
               "-o", str(demucs_out_root),
               str(audio_raw)]
        if use_two_stems_vocals:
            cmd.insert(cmd.index("-o"), "--two-stems")
            cmd.insert(cmd.index("-o"), "vocals")

        print(f"\nAttempt Demucs: device={run_device}, segment={seg}, two_stems={use_two_stems_vocals}, shifts={shifts}")
        try:
            run_cmd(cmd)
            return True
        except RuntimeError:
            print(f"Failed with segment={seg} on device={run_device}. Trying next...")
    return False

# Device fallback logic
real_device = device
if real_device == "cuda" and not torch.cuda.is_available():
    print("CUDA not available. Falling back to CPU.")
    real_device = "cpu"

ok = try_demucs(real_device)
if not ok and real_device == "cuda":
    print("GPU attempts failed. Falling back to CPU...")
    ok = try_demucs("cpu")
if not ok:
    raise RuntimeError("Demucs failed on all settings.")

# Locate stems
vocals_candidates = sorted(demucs_out_root.glob("**/vocals.wav"))
if not vocals_candidates:
    raise FileNotFoundError("vocals.wav not found after Demucs.")
VOCALS_WAV = vocals_candidates[-1]

bg_candidates = []
bg_candidates += sorted(demucs_out_root.glob("**/no_vocals.wav"))
bg_candidates += sorted(demucs_out_root.glob("**/instrumental.wav"))
BKG_WAV = bg_candidates[-1] if bg_candidates else None

print("\nVOCALS_WAV:", VOCALS_WAV)
print("BKG_WAV:", BKG_WAV if BKG_WAV else "Not found (optional)")

# Convert vocals to 16k mono wav for your denoiser model
MODEL_INPUT_16K = prep_root / f"{base}_vocals_16k_mono.wav"
run_cmd([
    "ffmpeg", "-y",
    "-i", str(VOCALS_WAV),
    "-ac", "1",
    "-ar", "16000",
    "-acodec", "pcm_s16le",
    str(MODEL_INPUT_16K)
])

# Save paths for next steps
step2_paths = {
    "audio_raw": str(audio_raw),
    "demucs_out_root": str(demucs_out_root),
    "vocals_wav": str(VOCALS_WAV),
    "background_wav": str(BKG_WAV) if BKG_WAV else None,
    "model_input_16k": str(MODEL_INPUT_16K),
}

print("\nstep2_paths saved:")
for k, v in step2_paths.items():
    print(f"- {k}: {v}")


Using extracted audio RAW: /content/dub_project/work/00_extract/video_raw.wav

Attempt Demucs: device=cuda, segment=4, two_stems=True, shifts=0
Running:
 /usr/bin/python3 -m demucs -n htdemucs -d cuda --jobs 0 --segment 4 --shifts 0 --overlap 0.25 --two-stems vocals -o /content/dub_project/work/01_demucs /content/dub_project/work/00_extract/video_raw.wav
█████████████████████████████████████████████████| 504.0/504.0 [00:42<00:00, 14.10seconds/s]
100%|████████████████████████████████████████████████████████████████████████| 504.0/504.0 [00:42<00:00, 11.75seconds/s]
  return save_with_torchcodec(
  return save_with_torchcodec(


VOCALS_WAV: /content/dub_project/work/01_demucs/htdemucs/video_raw/vocals.wav
BKG_WAV: /content/dub_project/work/01_demucs/htdemucs/video_raw/no_vocals.wav
Running:
 ffmpeg -y -i /content/dub_project/work/01_demucs/htdemucs/video_raw/vocals.wav -ac 1 -ar 16000 -acodec pcm_s16le /content/dub_project/work/02_prep/video_vocals_16k_mono.wav
top, [?] for help
Output #

In [9]:
#@title 2.2) Speech Noise Separation Model { display-mode: "form" }

weights_filename = "/content/gdrive/MyDrive/dub_project/best_model.pth" #@param {type:"string"}
segment_seconds = 2.0 #@param {type:"number"}
overlap_ratio = 0.5 #@param {type:"number"}
base_channels = 32 #@param {type:"number"}

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from pathlib import Path
import soundfile as sf

# Require prior steps
if "DIRS" not in globals():
    raise NameError("DIRS not found. Run Step 0 first.")
if "step2_paths" not in globals() or not isinstance(step2_paths, dict):
    raise NameError("step2_paths not found. Run Step 2.1 first.")

MODEL_INPUT_16K = Path(step2_paths["model_input_16k"])
if not MODEL_INPUT_16K.exists():
    raise FileNotFoundError(f"MODEL_INPUT_16K not found: {MODEL_INPUT_16K}")

device_t = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device_t)

WEIGHTS_PATH = Path(weights_filename.strip())
if not WEIGHTS_PATH.exists():
    raise FileNotFoundError(f"Weights not found: {WEIGHTS_PATH}")
print("Weights:", WEIGHTS_PATH)

# Enforce correct channels
if int(base_channels) != 32:
    print("base_channels should be 32 to match trained weights. Forcing 32.")
    base_channels = 32

def match_size(x, ref):
    _, _, H, W = x.shape
    _, _, Hr, Wr = ref.shape
    if H > Hr: x = x[:, :, :Hr, :]
    if W > Wr: x = x[:, :, :, :Wr]
    if x.shape[2] < Hr or x.shape[3] < Wr:
        pad_h = Hr - x.shape[2]
        pad_w = Wr - x.shape[3]
        x = F.pad(x, (0, pad_w, 0, pad_h))
    return x

class ResBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.c1 = nn.Conv2d(in_ch, out_ch, 3, padding=1, bias=False)
        self.n1 = nn.InstanceNorm2d(out_ch, affine=True)
        self.c2 = nn.Conv2d(out_ch, out_ch, 3, padding=1, bias=False)
        self.n2 = nn.InstanceNorm2d(out_ch, affine=True)
        self.act = nn.LeakyReLU(0.2)
        self.skip = nn.Conv2d(in_ch, out_ch, 1, bias=False) if in_ch != out_ch else None

    def forward(self, x):
        i = x if self.skip is None else self.skip(x)
        x = self.act(self.n1(self.c1(x)))
        x = self.n2(self.c2(x))
        return self.act(x + i)

class Down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, stride=2, padding=1, bias=False),
            nn.InstanceNorm2d(out_ch, affine=True),
            nn.LeakyReLU(0.2),
            ResBlock(out_ch, out_ch),
        )
    def forward(self, x):
        return self.net(x)

class Up(nn.Module):
    def __init__(self, in_ch, skip_ch, out_ch):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_ch, out_ch, 4, 2, 1, bias=False)
        self.norm = nn.InstanceNorm2d(out_ch, affine=True)
        self.act = nn.LeakyReLU(0.2)
        self.res = ResBlock(out_ch + skip_ch, out_ch)

    def forward(self, x, skip):
        x = self.act(self.norm(self.up(x)))
        x = match_size(x, skip)
        x = torch.cat([x, skip], dim=1)
        return self.res(x)

class DualStreamResUNet(nn.Module):
    def __init__(self, base=32):
        super().__init__()
        self.enc1 = ResBlock(1, base)
        self.enc2 = Down(base, base*2)
        self.enc3 = Down(base*2, base*4)
        self.enc4 = Down(base*4, base*8)
        self.bot = ResBlock(base*8, base*8)

        self.up3s = Up(base*8, base*4, base*4)
        self.up2s = Up(base*4, base*2, base*2)
        self.up1s = Up(base*2, base, base)

        self.up3n = Up(base*8, base*4, base*4)
        self.up2n = Up(base*4, base*2, base*2)
        self.up1n = Up(base*2, base, base)

        self.speech_head = nn.Sequential(nn.Conv2d(base, 1, 1), nn.Sigmoid())
        self.noise_head  = nn.Sequential(nn.Conv2d(base, 1, 1), nn.Sigmoid())

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(e1)
        e3 = self.enc3(e2)
        e4 = self.enc4(e3)
        b = self.bot(e4)

        xs = self.up3s(b, e3)
        xs = self.up2s(xs, e2)
        xs = self.up1s(xs, e1)

        xn = self.up3n(b, e3)
        xn = self.up2n(xn, e2)
        xn = self.up1n(xn, e1)

        return self.speech_head(xs), self.noise_head(xn)

def load_weights(model, path: Path):
    # Fallback for older torch versions that don't support weights_only
    try:
        sd = torch.load(str(path), map_location="cpu", weights_only=True)
    except TypeError:
        sd = torch.load(str(path), map_location="cpu")

    if any(k.startswith("module.") for k in sd.keys()):
        sd = {k.replace("module.", ""): v for k, v in sd.items()}

    model.load_state_dict(sd, strict=True)
    return model

CONFIG = {"SR": 16000, "SEG_SEC": float(segment_seconds), "N_FFT": 512, "HOP": 128, "WIN": 512}
STFT_WINDOW_CACHE = {}

def stft_mag_phase(wav: torch.Tensor):
    win_key = (wav.device.type, CONFIG["WIN"])
    if win_key not in STFT_WINDOW_CACHE:
        STFT_WINDOW_CACHE[win_key] = torch.hann_window(CONFIG["WIN"], device=wav.device)
    S = torch.stft(
        wav, n_fft=CONFIG["N_FFT"], hop_length=CONFIG["HOP"], win_length=CONFIG["WIN"],
        window=STFT_WINDOW_CACHE[win_key], return_complex=True
    )
    mag_log = torch.log1p(torch.abs(S))
    phase = torch.angle(S)
    return mag_log, phase

def istft_from_logmag_phase(mag_log: torch.Tensor, phase: torch.Tensor, length: int):
    win_key = (mag_log.device.type, CONFIG["WIN"])
    if win_key not in STFT_WINDOW_CACHE:
        STFT_WINDOW_CACHE[win_key] = torch.hann_window(CONFIG["WIN"], device=mag_log.device)
    mag = torch.expm1(mag_log).clamp_min(0.0)
    S = torch.polar(mag, phase)
    wav = torch.istft(
        S, n_fft=CONFIG["N_FFT"], hop_length=CONFIG["HOP"], win_length=CONFIG["WIN"],
        window=STFT_WINDOW_CACHE[win_key], length=length
    )
    return wav

@torch.no_grad()
def denoise_waveform(model, wav_1d: torch.Tensor):
    seg_len = int(CONFIG["SR"] * CONFIG["SEG_SEC"])
    hop = int(seg_len * (1.0 - float(overlap_ratio)))
    hop = max(1, hop)

    T0 = wav_1d.numel()
    wav = wav_1d

    if wav.numel() < seg_len:
        wav = F.pad(wav, (0, seg_len - wav.numel()))
    else:
        remainder = (wav.numel() - seg_len) % hop
        if remainder != 0:
            wav = F.pad(wav, (0, hop - remainder))

    T = wav.numel()
    win = torch.hann_window(seg_len, periodic=True)
    win = win / (win.max() + 1e-8)

    out = torch.zeros(T, dtype=torch.float32)
    wsum = torch.zeros(T, dtype=torch.float32)

    for start in range(0, T - seg_len + 1, hop):
        chunk = wav[start:start+seg_len].unsqueeze(0).to(device_t)
        mag_log, phase = stft_mag_phase(chunk)
        Ms, _ = model(mag_log.unsqueeze(1))
        mag_hat_log = torch.log1p(torch.expm1(mag_log) * Ms.squeeze(1))
        pred = istft_from_logmag_phase(mag_hat_log, phase, length=seg_len).squeeze(0).cpu()
        out[start:start+seg_len] += pred * win
        wsum[start:start+seg_len] += win

    out = out / (wsum + 1e-8)
    return out[:T0]

model = DualStreamResUNet(base=int(base_channels)).to(device_t).eval()
model = load_weights(model, WEIGHTS_PATH)

wav, sr = torchaudio.load(str(MODEL_INPUT_16K))
wav = wav.mean(dim=0)
if sr != CONFIG["SR"]:
    wav = torchaudio.functional.resample(wav, sr, CONFIG["SR"])
wav = wav.contiguous().float().cpu()

clean = denoise_waveform(model, wav)

denoise_dir = Path(DIRS["work"]) / "02_denoise"
denoise_dir.mkdir(parents=True, exist_ok=True)

video_stem = Path(INPUT_VIDEO).stem
CLEAN_AUDIO_FOR_ASR = denoise_dir / f"{video_stem}_speech_clean_16k.wav"
sf.write(str(CLEAN_AUDIO_FOR_ASR), clean.numpy(), CONFIG["SR"], subtype="PCM_16")

print("CLEAN_AUDIO_FOR_ASR =", CLEAN_AUDIO_FOR_ASR)

step2_paths["speech_clean_16k"] = str(CLEAN_AUDIO_FOR_ASR)


Device: cuda
Weights: /content/gdrive/MyDrive/dub_project/best_model.pth
CLEAN_AUDIO_FOR_ASR = /content/dub_project/work/02_denoise/video_speech_clean_16k.wav


In [10]:
#@title 2.21) Sanity check (after 2.2) { display-mode: "form" }

from pathlib import Path

if "step2_paths" not in globals():
    raise NameError("step2_paths not found. Run Step 2.1 first.")

p = Path(step2_paths.get("speech_clean_16k",""))
print("speech_clean_16k =", p)
print("Exists:", p.exists())
if p.exists():
    print("Size:", p.stat().st_size)


speech_clean_16k = /content/dub_project/work/02_denoise/video_speech_clean_16k.wav
Exists: True
Size: 16118780


In [11]:
#@title 2.25) Create/Repair WhisperX venv (NO torchvision) { display-mode: "form" }

import sys, subprocess

def sh(cmd):
    print(">>", cmd)
    subprocess.check_call(cmd, shell=True)

def sh_try(cmd):
    print(">>", cmd)
    p = subprocess.run(cmd, shell=True, text=True, capture_output=True)
    if p.returncode != 0:
        print("STDERR tail:\n", (p.stderr or "")[-2500:])
        return False
    return True

sh("apt-get -y -qq update && apt-get -y -qq install ffmpeg libsndfile1")
sh("pip -q install -U uv")

sh("rm -rf /content/.venv")
sh(f"uv venv /content/.venv -p {sys.executable}")
sh("uv pip install -p /content/.venv/bin/python -U pip setuptools wheel")

ok = sh_try(
    "uv pip install -p /content/.venv/bin/python --no-cache-dir "
    "--index-url https://download.pytorch.org/whl/cu124 "
    "torch torchaudio"
)
if not ok:
    ok = sh_try(
        "uv pip install -p /content/.venv/bin/python --no-cache-dir "
        "--index-url https://download.pytorch.org/whl/cu121 "
        "torch torchaudio"
    )
if not ok:
    sh(
        "uv pip install -p /content/.venv/bin/python --no-cache-dir "
        "--index-url https://download.pytorch.org/whl/cpu "
        "torch torchaudio"
    )

sh("uv pip uninstall -p /content/.venv/bin/python -y torchvision || true")

sh("uv pip install -p /content/.venv/bin/python -U 'numpy<2'")
sh("uv pip install -p /content/.venv/bin/python -U 'transformers>=4.38,<5' 'accelerate>=0.30,<1' soundfile pysrt")

sh("uv pip install -p /content/.venv/bin/python -U git+https://github.com/m-bain/whisperX.git")

sh(
    '/content/.venv/bin/python -c "'
    'import importlib.util as u; '
    'import torch, torchaudio; '
    'print(\'torch:\', torch.__version__, \'cuda_available:\', torch.cuda.is_available(), \'cuda:\', torch.version.cuda); '
    'print(\'torchaudio:\', torchaudio.__version__); '
    'print(\'torchvision_spec:\', u.find_spec(\'torchvision\')); '
    'from transformers import Pipeline; print(\'Pipeline: OK\'); '
    'import whisperx; print(\'whisperx import: OK\')"'
)

print("venv ready: /content/.venv")


>> apt-get -y -qq update && apt-get -y -qq install ffmpeg libsndfile1
>> pip -q install -U uv
>> rm -rf /content/.venv
>> uv venv /content/.venv -p /usr/bin/python3
>> uv pip install -p /content/.venv/bin/python -U pip setuptools wheel
>> uv pip install -p /content/.venv/bin/python --no-cache-dir --index-url https://download.pytorch.org/whl/cu124 torch torchaudio
>> uv pip uninstall -p /content/.venv/bin/python -y torchvision || true
>> uv pip install -p /content/.venv/bin/python -U 'numpy<2'
>> uv pip install -p /content/.venv/bin/python -U 'transformers>=4.38,<5' 'accelerate>=0.30,<1' soundfile pysrt
>> uv pip install -p /content/.venv/bin/python -U git+https://github.com/m-bain/whisperX.git
>> /content/.venv/bin/python -c "import importlib.util as u; import torch, torchaudio; print('torch:', torch.__version__, 'cuda_available:', torch.cuda.is_available(), 'cuda:', torch.version.cuda); print('torchaudio:', torchaudio.__version__); print('torchvision_spec:', u.find_spec('torchvision')

In [12]:
#@title 2.3) WhisperX ASR + Align -> original_en.srt (RUN AS SCRIPT) { display-mode: "form" }

whisperx_model_name = "large-v3" #@param ["small","medium","large-v2","large-v3"]
language_code = "en" #@param {type:"string"}  # Use "" for auto-detect
device_preference = "cuda" #@param ["cuda","cpu"]
compute_type_pref = "float16" #@param ["float16","int8"]
batch_size = 16 #@param {type:"integer"}
chunk_size = 15 #@param {type:"integer"}

from pathlib import Path
import os, subprocess

if "DIRS" not in globals():
    raise NameError("DIRS not found. Run Step 0 first.")
if "step2_paths" not in globals() or not isinstance(step2_paths, dict):
    raise NameError("step2_paths not found. Run Step 2.1/2.2 first.")
if "INPUT_VIDEO" not in globals() or not INPUT_VIDEO:
    raise NameError("INPUT_VIDEO not found. Run Step 1 first.")

CLEAN_AUDIO_FOR_ASR = Path(step2_paths.get("speech_clean_16k",""))
if not CLEAN_AUDIO_FOR_ASR.exists():
    raise FileNotFoundError("speech_clean_16k not found. Run Step 2.2 first.")

asr_dir = Path(DIRS["work"]) / "03_asr"
asr_dir.mkdir(parents=True, exist_ok=True)

video_stem = Path(INPUT_VIDEO).stem
OUT_SRT = asr_dir / f"{video_stem}_original_en.srt"
if OUT_SRT.exists():
    OUT_SRT.unlink()

script_path = Path("/content/transcribe_whisperx.py")
script_path.write_text(r"""
import os
os.environ.setdefault("MPLBACKEND", "agg")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

import argparse, gc
from pathlib import Path
import torch
import whisperx

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--audio", required=True)
    ap.add_argument("--out_srt", required=True)
    ap.add_argument("--model", default="large-v3")
    ap.add_argument("--language", default="")
    ap.add_argument("--device", default="cuda")
    ap.add_argument("--compute_type", default="float16")
    ap.add_argument("--batch_size", type=int, default=16)
    ap.add_argument("--chunk_size", type=int, default=15)
    args = ap.parse_args()

    audio_path = Path(args.audio)
    out_srt = Path(args.out_srt)
    out_dir = out_srt.parent
    out_dir.mkdir(parents=True, exist_ok=True)

    if not audio_path.exists():
        raise FileNotFoundError(f"Audio not found: {audio_path}")

    device = "cuda" if (args.device == "cuda" and torch.cuda.is_available()) else "cpu"
    compute_type = args.compute_type

    if device == "cpu" and compute_type.lower() in ("float16","fp16"):
        compute_type = "int8"

    asr_options = {
        "initial_prompt": "Educational video. Use clear punctuation. Keep technical terms accurate.",
        "suppress_numerals": False,
    }

    kwargs = dict(
        compute_type=compute_type,
        asr_options=asr_options,
        vad_method="silero",
    )
    if args.language.strip():
        kwargs["language"] = args.language.strip()

    print("Device:", device, "| compute_type:", compute_type)
    print("Loading model...")
    model = whisperx.load_model(args.model, device, **kwargs)

    print("Loading audio...")
    audio = whisperx.load_audio(str(audio_path))

    print("Transcribing...")
    result = model.transcribe(
        audio,
        batch_size=args.batch_size if device == "cuda" else max(1, min(4, args.batch_size)),
        chunk_size=args.chunk_size,
        print_progress=True,
    )

    del model
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    lang = result.get("language") or (args.language.strip() if args.language.strip() else None)
    if not lang:
        raise RuntimeError("No language detected/fixed.")

    print("Aligning... lang =", lang)
    align_model, metadata = whisperx.load_align_model(language_code=lang, device=device)
    result_aligned = whisperx.align(
        result["segments"],
        align_model,
        metadata,
        audio,
        device,
        return_char_alignments=False,
        print_progress=False,
    )

    del align_model
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    from whisperx.utils import get_writer
    writer = get_writer("srt", output_dir=str(out_dir))

    writer_options = {"highlight_words": False, "max_line_width": None, "max_line_count": None}

    stem = out_srt.stem
    dummy_audio_name = f"{stem}.wav"
    writer({"segments": result_aligned["segments"], "language": lang}, dummy_audio_name, writer_options)

    generated = out_dir / f"{stem}.srt"
    if not generated.exists():
        raise FileNotFoundError(f"Expected SRT not generated: {generated}")

    if generated.resolve() != out_srt.resolve():
        generated.replace(out_srt)

    print("Saved:", out_srt)

if __name__ == "__main__":
    main()
""", encoding="utf-8")

cmd = [
    "/content/.venv/bin/python",
    str(script_path),
    "--audio", str(CLEAN_AUDIO_FOR_ASR),
    "--out_srt", str(OUT_SRT),
    "--model", whisperx_model_name,
    "--language", (language_code or ""),
    "--device", device_preference,
    "--compute_type", compute_type_pref,
    "--batch_size", str(batch_size),
    "--chunk_size", str(chunk_size),
]

env = os.environ.copy()
env["MPLBACKEND"] = "agg"
env["TOKENIZERS_PARALLELISM"] = "false"
env["TORCH_FORCE_NO_WEIGHTS_ONLY_LOAD"] = "true"

p = subprocess.run(cmd, env=env, capture_output=True, text=True)
print(p.stdout)

if p.returncode != 0:
    print("ERROR (stderr tail):\n", (p.stderr or "")[-4000:])
    raise RuntimeError(f"WhisperX script failed with exit code {p.returncode}")

if OUT_SRT.exists():
    step2_paths["asr_srt_en"] = str(OUT_SRT)
    print("asr_srt_en =", step2_paths["asr_srt_en"])
else:
    raise FileNotFoundError(f"SRT not found: {OUT_SRT}")


Device: cuda | compute_type: float16
Loading model...
2026-01-02 12:41:33 - whisperx.vads.silero - INFO - Performing voice activity detection using Silero...
Downloading: "https://github.com/snakers4/silero-vad/zipball/master" to /root/.cache/torch/hub/master.zip
Loading audio...
Transcribing...
Progress: 2.63%...
Progress: 5.26%...
Progress: 7.89%...
Progress: 10.53%...
Progress: 13.16%...
Progress: 15.79%...
Progress: 18.42%...
Progress: 21.05%...
Progress: 23.68%...
Progress: 26.32%...
Progress: 28.95%...
Progress: 31.58%...
Progress: 34.21%...
Progress: 36.84%...
Progress: 39.47%...
Progress: 42.11%...
Progress: 44.74%...
Progress: 47.37%...
Progress: 50.00%...
Progress: 52.63%...
Progress: 55.26%...
Progress: 57.89%...
Progress: 60.53%...
Progress: 63.16%...
Progress: 65.79%...
Progress: 68.42%...
Progress: 71.05%...
Progress: 73.68%...
Progress: 76.32%...
Progress: 78.95%...
Progress: 81.58%...
Progress: 84.21%...
Progress: 86.84%...
Progress: 89.47%...
Progress: 92.11%...
Progre

In [13]:
#@title 3.0) User Settings (Provider + API Key) - .env style {display-mode: "form"}

import os, sys, subprocess
from pathlib import Path
from dotenv import load_dotenv


PROVIDER = "groq"  #@param ["gemini","groq"]
DOMAIN   = "academic"  #@param ["general","technical","medical","academic"]

GEMINI_MODEL = "gemini-3-flash-preview"  #@param {type:"string"}

GROQ_MODEL = "qwen/qwen3-32b"  #@param ["openai/gpt-oss-120b","qwen/qwen3-32b","custom"]
GROQ_MODEL_CUSTOM = ""  #@param {type:"string"}
if GROQ_MODEL == "custom":
    GROQ_MODEL = (GROQ_MODEL_CUSTOM or "").strip() or "qwen/qwen3-32b"

# ---- Install deps once ----
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U", "python-dotenv", "groq", "google-genai", "google-generativeai"])


DOTENV_PATH = "/content/.env"
p = Path(DOTENV_PATH)
if not p.exists():
    raise FileNotFoundError(f".env not found at: {p}. Create it first (مثل المقال).")

load_dotenv(dotenv_path=str(p), override=False)

def require_env(name: str) -> str:
    v = os.environ.get(name)
    if not v or not v.strip():
        raise RuntimeError(f"Missing {name}. Put it in .env then reload runtime/cell.")
    return v.strip()

print(f"Provider = {PROVIDER} | Domain = {DOMAIN}")
print("Gemini model:", GEMINI_MODEL)
print("Groq model  :", GROQ_MODEL)

# Validate only the provider you selected
if PROVIDER == "groq":
    require_env("GROQ_API_KEY")
    print("✅ GROQ_API_KEY loaded from .env (hidden).")
else:
    require_env("GEMINI_API_KEY")
    print("✅ GEMINI_API_KEY loaded from .env (hidden).")


# ---- Install deps (مرة واحدة) ----
pkgs = [
    "pysrt",
    "groq",
    "pydub",
    "edge-tts",
    "aiohttp",
    "nest_asyncio",
    "soundfile",
    # Gemini libs (نثبت الاثنين لضمان التوافق)
    "google-genai",
    "google-generativeai",
]
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U", *pkgs])

print("✅ Dependencies installed/updated.")


Provider = groq | Domain = academic
Gemini model: gemini-3-flash-preview
Groq model  : qwen/qwen3-32b
✅ GROQ_API_KEY loaded from .env (hidden).
✅ Dependencies installed/updated.


In [14]:
#@title 3.1) Arabic dubbing rewrite for SRT (Time-aware + Auto-shorten) {display-mode:"form"}

# ---- Tunables ----
AR_CHARS_PER_SEC = 13.0   #@param {type:"number"}
MIN_MAX_CHARS    = 12     #@param {type:"integer"}
SOFT_OVER_BUDGET = 1.15   #@param {type:"number"}
MAX_SPEEDUP_NEED = 1.8   #@param {type:"number"}
MAX_COMPRESS_ROUNDS = 2   #@param {type:"integer"}

MAX_CHARS_PER_CHUNK = 4500  #@param {type:"integer"}
MAX_RETRIES = 6             #@param {type:"integer"}

import os, re, json, time
from pathlib import Path
import pysrt

# ---- Require previous steps ----
try:
    step2_paths
    DIRS
    INPUT_VIDEO
except NameError:
    raise NameError("Missing step2_paths/DIRS/INPUT_VIDEO. Run Step 0 + Step 2.3 + Step 3.0 first.")

SRC_SRT = Path(step2_paths.get("asr_srt_en",""))
if not SRC_SRT.exists():
    raise FileNotFoundError(f"Missing asr_srt_en: {SRC_SRT}. Run Step 2.3 first.")

# ---- Output ----
translate_dir = DIRS["work"] / "04_translate"
translate_dir.mkdir(parents=True, exist_ok=True)

video_stem = Path(INPUT_VIDEO).stem
OUT_SRT = translate_dir / f"{video_stem}_ar_dub.srt"

# ---- Provider vars from Step 3.0 ----
PROVIDER = globals().get("PROVIDER", "gemini")
DOMAIN   = globals().get("DOMAIN", "general")
GEMINI_MODEL = globals().get("GEMINI_MODEL", "gemini-3-flash-preview")
GROQ_MODEL   = globals().get("GROQ_MODEL", "qwen/qwen3-32b")

# ---- Helpers ----
def extract_json_array(text: str):
    t = (text or "").replace("```json", "").replace("```", "").strip()
    lb = t.find("[")
    rb = t.rfind("]")
    if lb != -1 and rb != -1 and rb > lb:
        cand = t[lb:rb+1].strip()
        try:
            return json.loads(cand)
        except Exception:
            pass
    m = re.search(r"\[\s*\{.*\}\s*\]", t, flags=re.DOTALL)
    if not m:
        raise ValueError("No JSON array found in LLM output.")
    return json.loads(m.group(0))

def call_groq(prompt: str) -> str:
    from groq import Groq
    key = os.getenv("GROQ_API_KEY","").strip()
    if not key:
        raise RuntimeError("Missing GROQ_API_KEY (set it in Step 3.0).")

    client = Groq(api_key=key)

    base_kwargs = dict(
        model=str(GROQ_MODEL),
        messages=[{"role":"user","content":prompt}],
        temperature=0.25,
        top_p=1,
        stream=False,
    )
    try:
        r = client.chat.completions.create(**base_kwargs, max_completion_tokens=6000)
    except TypeError:
        r = client.chat.completions.create(**base_kwargs, max_tokens=6000)

    return r.choices[0].message.content

def call_gemini(prompt: str) -> str:
    from google import genai
    from google.genai import types
    key = os.getenv("GEMINI_API_KEY","").strip()
    if not key:
        raise RuntimeError("Missing GEMINI_API_KEY (set it in Step 3.0).")

    client = genai.Client(api_key=key)
    r = client.models.generate_content(
        model=str(GEMINI_MODEL),
        contents=prompt,
        config=types.GenerateContentConfig(temperature=0.25),
    )
    return r.text or ""

def llm(prompt: str) -> str:
    return call_gemini(prompt) if PROVIDER == "gemini" else call_groq(prompt)

def chunk_items(items, max_chars=4500):
    chunks, cur, cur_len = [], [], 0
    for it in items:
        line = f'{it["i"]}: dur={it["dur"]}s max={it["max_chars"]} | {it["text"]}'
        if cur and (cur_len + len(line) + 1 > max_chars):
            chunks.append(cur)
            cur, cur_len = [], 0
        cur.append(it)
        cur_len += len(line) + 1
    if cur:
        chunks.append(cur)
    return chunks

def build_prompt_timeaware(chunk):
    lines = "\n".join([f'{x["i"]}: dur={x["dur"]}s max={x["max_chars"]} | {x["text"]}' for x in chunk])
    return f"""
أنت محرّر نصوص دبلجة محترف.

المهمة:
أعد صياغة كل سطر إلى العربية الفصحى الحديثة (MSA) بصياغة منطوقة مناسبة للدبلجة، مع الالتزام بزمن السطر.

قواعد صارمة:
- حافظ على المعنى دون إضافة أو حذف معلومات.
- لا تستخدم العامية أو لهجات محكية.
- التزم قدر الإمكان بحد "max" لعدد الأحرف في كل سطر.
- حافظ على الأسماء والمصطلحات التقنية والأرقام بدقة.
- لا تغيّر الأرقام داخل أسماء الإصدارات/الموديلات أو الرموز التقنية (مثل GPT-4, v2.1, USB 3.0).
- اترك الأرقام كما هي (سيتم التعامل معها لاحقاً إذا لزم).
- أخرج فقط JSON array بالشكل:
  [{{"i":..,"t":".."}}, ...]
- كل "i" يجب أن يطابق نفس رقم السطر وبنفس الترتيب.

المجال: {DOMAIN}

المدخل:
{lines}
""".strip()

def build_prompt_compress(chunk):
    lines = "\n".join([f'{x["i"]}: max={x["max_chars"]} | EN: {x["en"]} | AR: {x["ar"]}' for x in chunk])
    return f"""
أنت محرّر دبلجة محترف.

المهمة:
قصّر فقط النص العربي ليلائم حد "max" دون خسارة المعنى أو التفاصيل المهمة.

قواعد صارمة:
- لا تضف معلومات جديدة ولا تحذف نقاطًا أساسية.
- أبقِ الأسماء والمصطلحات التقنية دقيقة.
- العربية الفصحى الحديثة فقط (بدون عامية).
- اترك الأرقام كما هي.
- أخرج فقط JSON array:
  [{{"i":..,"t":".."}}, ...]
- كل "i" يجب أن يطابق نفس رقم السطر وبنفس الترتيب.

المجال: {DOMAIN}

المدخل:
{lines}
""".strip()

def translate_chunk_timeaware(chunk):
    prompt = build_prompt_timeaware(chunk)
    expected = [x["i"] for x in chunk]
    last_err = None

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            out = llm(prompt)
            arr = extract_json_array(out)

            got = [int(x["i"]) for x in arr]
            if len(arr) != len(chunk):
                raise ValueError("Count mismatch")
            if got != expected:
                raise ValueError("Index order mismatch")

            return {int(x["i"]): str(x["t"]).strip() for x in arr}
        except Exception as e:
            last_err = e
            time.sleep(min(20, 2 ** attempt))

    raise RuntimeError(f"Translation failed: {last_err}")

def compress_chunk(chunk):
    prompt = build_prompt_compress(chunk)
    expected = [x["i"] for x in chunk]
    last_err = None

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            out = llm(prompt)
            arr = extract_json_array(out)

            got = [int(x["i"]) for x in arr]
            if len(arr) != len(chunk):
                raise ValueError("Count mismatch")
            if got != expected:
                raise ValueError("Index order mismatch")

            return {int(x["i"]): str(x["t"]).strip() for x in arr}
        except Exception as e:
            last_err = e
            time.sleep(min(20, 2 ** attempt))

    raise RuntimeError(f"Compression failed: {last_err}")

# ---- Load SRT + budgets ----
subs = pysrt.open(str(SRC_SRT), encoding="utf-8")

items = []
for s in subs:
    en = re.sub(r"\s*\n\s*", " ", (s.text or "").strip()).strip()
    dur_sec = max(0.20, (s.end.ordinal - s.start.ordinal) / 1000.0)
    max_chars = max(MIN_MAX_CHARS, int(dur_sec * AR_CHARS_PER_SEC))
    items.append({"i": int(s.index), "text": en, "dur": round(dur_sec, 2), "max_chars": int(max_chars)})

chunks = chunk_items(items, MAX_CHARS_PER_CHUNK)
print(f"Lines={len(items)} | Chunks={len(chunks)} | Provider={PROVIDER} | Domain={DOMAIN}")
print("Gemini:", GEMINI_MODEL)
print("Groq  :", GROQ_MODEL)

# ---- Pass 1 ----
mapping = {}
for ci, ch in enumerate(chunks, start=1):
    mapping.update(translate_chunk_timeaware(ch))
    print(f"Pass1 chunk {ci}/{len(chunks)} done.")

# ---- Risk detection ----
def est_need(ar_text: str, max_chars: int):
    L = max(1, len((ar_text or "").strip()))
    return L / max(1, int(max_chars))

def find_bad(mapping_dict):
    bad = []
    for it in items:
        i = it["i"]
        ar = (mapping_dict.get(i, "") or "").strip()
        need = est_need(ar, it["max_chars"])
        if need > SOFT_OVER_BUDGET or need > MAX_SPEEDUP_NEED:
            bad.append({"i": i, "en": it["text"], "ar": ar, "max_chars": it["max_chars"], "need": round(need, 2)})
    bad.sort(key=lambda x: x["need"], reverse=True)
    return bad

def chunk_bad(bad, max_chars=4500):
    chunks, cur, cur_len = [], [], 0
    for x in bad:
        line = f'{x["i"]}: max={x["max_chars"]} | EN: {x["en"]} | AR: {x["ar"]}'
        if cur and (cur_len + len(line) + 1 > max_chars):
            chunks.append(cur); cur=[]; cur_len=0
        cur.append(x); cur_len += len(line) + 1
    if cur: chunks.append(cur)
    return chunks

# ---- Pass 2 (compress risky only) ----
for r in range(1, MAX_COMPRESS_ROUNDS + 1):
    bad = find_bad(mapping)
    print(f"Compression round {r}/{MAX_COMPRESS_ROUNDS} | risky={len(bad)}")
    if not bad:
        break
    for ci, ch in enumerate(chunk_bad(bad, MAX_CHARS_PER_CHUNK), start=1):
        fixed = compress_chunk(ch)
        mapping.update(fixed)
        print(f"Compress chunk {ci} done.")

# ---- Save final SRT ----
for s in subs:
    s.text = mapping.get(int(s.index), (s.text or "").strip())

subs.save(str(OUT_SRT), encoding="utf-8")
step2_paths["srt_ar_dub"] = str(OUT_SRT)

print("✅ Saved:", OUT_SRT)
print("✅ step2_paths['srt_ar_dub'] =", step2_paths["srt_ar_dub"])


Lines=75 | Chunks=2 | Provider=groq | Domain=academic
Gemini: gemini-3-flash-preview
Groq  : qwen/qwen3-32b
Pass1 chunk 1/2 done.
Pass1 chunk 2/2 done.
Compression round 1/2 | risky=6
Compress chunk 1 done.
Compression round 2/2 | risky=6
Compress chunk 1 done.
✅ Saved: /content/dub_project/work/04_translate/video_ar_dub.srt
✅ step2_paths['srt_ar_dub'] = /content/dub_project/work/04_translate/video_ar_dub.srt


In [15]:
#@title 3.2) Build TTS-SRT (convert standalone numbers to Arabic words) {display-mode:"form"}

import re
from pathlib import Path
import pysrt

in_path = Path(step2_paths.get("srt_ar_dub",""))
if not in_path.exists():
    raise FileNotFoundError(f"Missing srt_ar_dub: {in_path}")

out_path = in_path.with_name(in_path.stem + "_tts_numbers.srt")

# --- digit normalization (Arabic-Indic + Persian) ---
DIGIT_TRANS = str.maketrans("٠١٢٣٤٥٦٧٨٩۰۱۲۳۴۵۶۷۸۹", "01234567890123456789")

ONES = ["صفر","واحد","اثنان","ثلاثة","أربعة","خمسة","ستة","سبعة","ثمانية","تسعة"]
TENS = ["", "عشرة", "عشرون", "ثلاثون", "أربعون", "خمسون", "ستون", "سبعون", "ثمانون", "تسعون"]
TEENS = {
    11:"أحد عشر", 12:"اثنا عشر", 13:"ثلاثة عشر", 14:"أربعة عشر", 15:"خمسة عشر",
    16:"ستة عشر", 17:"سبعة عشر", 18:"ثمانية عشر", 19:"تسعة عشر"
}
HUNDREDS = {
    1:"مئة", 2:"مئتان", 3:"ثلاثمئة", 4:"أربعمئة", 5:"خمسمئة",
    6:"ستمئة", 7:"سبعمئة", 8:"ثمانمئة", 9:"تسعمئة"
}

SCALES = [
    ("", "", "", ""),  # units
    ("ألف", "ألفان", "آلاف", "ألف"),            # 10^3
    ("مليون", "مليونان", "ملايين", "مليون"),    # 10^6
    ("مليار", "ملياران", "مليارات", "مليار"),   # 10^9
    ("تريليون", "تريليونان", "تريليونات", "تريليون"),  # 10^12 (احتياط)
]

def two_digits(n: int) -> str:
    if n < 10: return ONES[n]
    if n == 10: return "عشرة"
    if 11 <= n <= 19: return TEENS[n]
    t, u = divmod(n, 10)
    if u == 0: return TENS[t]
    return f"{ONES[u]} و{TENS[t]}"

def three_digits(n: int) -> str:
    if n < 100: return two_digits(n)
    h, r = divmod(n, 100)
    htxt = HUNDREDS[h]
    return htxt if r == 0 else f"{htxt} و{two_digits(r)}"

def int_to_words(n: int) -> str:
    if n == 0:
        return "صفر"

    parts = []
    group_idx = 0
    while n > 0:
        n, group = divmod(n, 1000)
        if group == 0:
            group_idx += 1
            continue

        group_words = three_digits(group)

        # scale forms
        if group_idx >= len(SCALES):
            # إذا رقم ضخم جدًا، نكمّل بنفس آخر Scale
            singular, dual, plural, many = SCALES[-1][0], SCALES[-1][1], SCALES[-1][2], SCALES[-1][3]
        else:
            singular, dual, plural, many = SCALES[group_idx][0], SCALES[group_idx][1], SCALES[group_idx][2], SCALES[group_idx][3]

        if group_idx == 0:
            parts.append(group_words)
        else:
            if group == 1:
                parts.append(singular)
            elif group == 2:
                parts.append(dual)
            elif 3 <= group <= 10:
                parts.append(f"{group_words} {plural}")
            else:
                parts.append(f"{group_words} {many}")

        group_idx += 1

    return " و".join(reversed(parts))

def decimal_to_words(num_str: str) -> str:
    s = num_str.replace(",", "").strip()
    sign = ""
    if s.startswith("+"):
        s = s[1:]
    elif s.startswith("-"):
        sign = "سالب "
        s = s[1:]

    if "." in s:
        ip, fp = s.split(".", 1)
        ip = ip if ip else "0"
        fp = fp.rstrip("0")
        base = int_to_words(int(ip))
        if not fp:
            return sign + base
        frac = " ".join(ONES[int(d)] for d in fp if d.isdigit())
        return sign + f"{base} فاصل {frac}"

    return sign + int_to_words(int(s))

def should_skip_number(text: str, start: int, end: int) -> bool:
    """
    Skip ONLY if number is part of LATIN technical token (GPT-4, v2.1, USB3.0, etc).
    Important: DO NOT skip just because Arabic letters touch the number.
    """
    prev = text[start-1] if start-1 >= 0 else ""
    nxt  = text[end] if end < len(text) else ""
    token = text[start:end]

    # If adjacent to Latin letters => skip
    if re.match(r"[A-Za-z]", prev) or re.match(r"[A-Za-z]", nxt):
        return True

    # If pattern like "GPT-4" or "v2.1" where previous is separator and char before it is Latin
    if start-2 >= 0:
        prev2 = text[start-2]
        if re.match(r"[A-Za-z]", prev2) and prev in "-_/":
            return True

    # If previous word (within a small window) is a Latin acronym and token looks like version (has dot)
    pre = text[max(0, start-12):start]
    if "." in token and re.search(r"[A-Za-z]{2,10}\s*$", pre):
        return True

    return False

# Normalize Arabic thousands/decimal separators before regex
def normalize_separators(s: str) -> str:
    # Arabic thousands separator: '٬' (U+066C) -> ','
    # Arabic decimal separator : '٫' (U+066B) -> '.'
    return (s or "").replace("٬", ",").replace("٫", ".").replace("،", ",")

# match standalone numbers with optional decimal and optional %
num_re = re.compile(r"[+-]?\d[\d,]*([.]\d+)?%?")

def convert_numbers_in_text(s: str) -> str:
    if not s:
        return s

    s2 = s.translate(DIGIT_TRANS)
    s2 = normalize_separators(s2)

    def repl(m):
        raw = m.group(0)
        st, en = m.start(), m.end()

        if should_skip_number(s2, st, en):
            return raw

        is_pct = raw.endswith("%")
        core = raw[:-1] if is_pct else raw

        try:
            w = decimal_to_words(core)
            return (w + " بالمئة") if is_pct else w
        except Exception:
            return raw

    return num_re.sub(repl, s2)

subs = pysrt.open(str(in_path), encoding="utf-8")
for sub in subs:
    lines = sub.text.splitlines()
    sub.text = "\n".join(convert_numbers_in_text(ln) for ln in lines)

subs.save(str(out_path), encoding="utf-8")
step2_paths["srt_ar_dub_tts_numbers"] = str(out_path)

print("Saved TTS SRT:", out_path)
print("✅ step2_paths['srt_ar_dub_tts_numbers'] =", step2_paths["srt_ar_dub_tts_numbers"])


Saved TTS SRT: /content/dub_project/work/04_translate/video_ar_dub_tts_numbers.srt
✅ step2_paths['srt_ar_dub_tts_numbers'] = /content/dub_project/work/04_translate/video_ar_dub_tts_numbers.srt


In [16]:
#@title 4.0) TTS Provider Settings (Edge or ElevenLabs) - .env style {display-mode:"form"}

import os, sys, subprocess
from pathlib import Path

TTS_PROVIDER = "elevenlabs"  #@param ["edge","elevenlabs"]

EDGE_VOICE = "ar-AE-HamdanNeural"  #@param ["ar-AE-HamdanNeural","ar-AE-FatimaNeural","ar-SA-HamedNeural","ar-SA-ZariyahNeural","ar-EG-ShakirNeural","ar-EG-SalmaNeural"]
EDGE_RATE_PERCENT = 0  #@param {type:"integer"}

ELEVEN_MODEL_ID = "eleven_multilingual_v2"  #@param {type:"string"}
ELEVEN_VOICE_ID = "R6nda3uM038xEEKi7GFl"    #@param {type:"string"}

# load .env (مثل المقال)
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U", "python-dotenv"])
from dotenv import load_dotenv

DOTENV_PATH = "/content/.env"
p = Path(DOTENV_PATH)
if not p.exists():
    raise FileNotFoundError(f".env not found at: {p}")
load_dotenv(dotenv_path=str(p), override=False)

def require_env(name: str) -> str:
    v = os.environ.get(name)
    if not v or not v.strip():
        raise RuntimeError(f"Missing {name}. Put it in .env.")
    return v.strip()

if TTS_PROVIDER == "elevenlabs":
    require_env("ELEVEN_API_KEY")
    print("✅ ELEVEN_API_KEY loaded from .env (hidden).")

print("TTS_PROVIDER =", TTS_PROVIDER)


✅ ELEVEN_API_KEY loaded from .env (hidden).
TTS_PROVIDER = elevenlabs


In [17]:
#@title 4.1) Build Dub Vocals from Arabic SRT (Edge OR ElevenLabs + Time Fit) { display-mode:"form" }

# --- Timing/Audio settings ---
max_speedup   = 1.8  #@param {type:"number"}   # quality sweet spot: 1.20~1.30
vocal_gain_db = 3.0  #@param {type:"number"}
target_sr     = 48000 #@param {type:"integer"}
fade_ms       = 8    #@param {type:"integer"}  # 0 to disable

import os, re, sys, asyncio, subprocess, time, hashlib
from pathlib import Path

import nest_asyncio
nest_asyncio.apply()

import pysrt
from pydub import AudioSegment

# ----------------------------
# Preconditions
# ----------------------------
try:
    DIRS
    step2_paths
    INPUT_VIDEO
except NameError:
    raise NameError("Run Step 0 + Step 2 + Step 3 first (need DIRS/step2_paths/INPUT_VIDEO).")

# ----------------------------
# Read settings from Step 4.0 (NO overrides)
# ----------------------------
TTS_PROVIDER      = globals().get("TTS_PROVIDER", "edge")
EDGE_VOICE        = globals().get("EDGE_VOICE", "ar-SA-ZariyahNeural")
EDGE_RATE_PERCENT = int(globals().get("EDGE_RATE_PERCENT", 0))

ELEVEN_MODEL_ID   = globals().get("ELEVEN_MODEL_ID", "eleven_multilingual_v2")
ELEVEN_VOICE_ID   = globals().get("ELEVEN_VOICE_ID", "")
ELEVEN_OUTPUT_FORMAT = globals().get("ELEVEN_OUTPUT_FORMAT", "mp3_44100_128")

print("Using TTS_PROVIDER =", TTS_PROVIDER)
if TTS_PROVIDER == "edge":
    print("Edge voice/rate    =", EDGE_VOICE, f"{EDGE_RATE_PERCENT:+d}%")
else:
    print("Eleven model/voice =", ELEVEN_MODEL_ID, ELEVEN_VOICE_ID)
    print("Eleven output fmt  =", ELEVEN_OUTPUT_FORMAT)

# Ensure deps for selected provider
if TTS_PROVIDER == "edge":
    # edge-tts should already be installed in your Step 3.0 deps, but keep safe:
    try:
        import edge_tts  # noqa
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U", "edge-tts", "aiohttp"])
else:
    try:
        from elevenlabs.client import ElevenLabs  # noqa
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U", "elevenlabs"])
    if not os.getenv("ELEVEN_API_KEY","").strip():
        import getpass
        os.environ["ELEVEN_API_KEY"] = getpass.getpass("Enter ELEVEN_API_KEY (hidden): ")

# ----------------------------
# Inputs
# ----------------------------
SRT_AR = Path(step2_paths.get("srt_ar_dub_tts_numbers", step2_paths.get("srt_ar_dub","")))
if not SRT_AR.exists():
    raise FileNotFoundError(f"Arabic dub SRT not found: {SRT_AR}")

ref_audio = step2_paths.get("audio_raw") or step2_paths.get("audio_16k_mono") or step2_paths.get("speech_clean_16k")
if not ref_audio:
    raise RuntimeError("No reference audio path found in step2_paths.")
REF_AUDIO = Path(ref_audio)
if not REF_AUDIO.exists():
    raise FileNotFoundError(f"Reference audio not found: {REF_AUDIO}")

# ----------------------------
# Output dirs
# ----------------------------
tts_dir = DIRS["work"] / "05_tts"
cache_dir = tts_dir / "cache_tts"
tts_dir.mkdir(parents=True, exist_ok=True)
cache_dir.mkdir(parents=True, exist_ok=True)

video_stem = Path(INPUT_VIDEO).stem
DUB_VOCALS_WAV = tts_dir / f"{video_stem}_dub_vocals_{target_sr}.wav"

# ----------------------------
# Helpers
# ----------------------------
def run(cmd):
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        print("CMD:", " ".join(map(str, cmd)))
        print("STDERR tail:\n", (p.stderr or "")[-2500:])
        raise RuntimeError("Command failed")
    return p

def ms_from_pysrt_time(t):
    return (t.hours*3600 + t.minutes*60 + t.seconds)*1000 + t.milliseconds

def clean_text_ar(s: str) -> str:
    s = (s or "").strip()
    s = re.sub(r"\s+", " ", s)
    for bad in ["[", "]", "♫", "♪"]:
        s = s.replace(bad, "")
    return s.strip()

def stable_hash(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()[:16]

def atempo_chain(speed: float) -> str:
    """
    ffmpeg atempo supports 0.5..2.0 per filter; chain if outside range.
    """
    if speed <= 0:
        speed = 1.0
    parts = []
    remain = float(speed)

    while remain > 2.0:
        parts.append(2.0)
        remain /= 2.0
    while remain < 0.5:
        parts.append(0.5)
        remain /= 0.5
    parts.append(remain)

    parts = [max(0.5, min(2.0, float(x))) for x in parts]
    return ",".join([f"atempo={x:.6f}" for x in parts])

async def tts_to_mp3(text: str, out_mp3: Path):
    """
    Create MP3 using selected provider.
    """
    if TTS_PROVIDER == "edge":
        import edge_tts
        import aiohttp
        import asyncio as aio

        rate = f"{int(EDGE_RATE_PERCENT):+d}%"
        last_err = None

        for attempt in range(1, 6):
            try:
                com = edge_tts.Communicate(text=text, voice=EDGE_VOICE, rate=rate)
                await com.save(str(out_mp3))
                return
            except (aiohttp.client_exceptions.WSServerHandshakeError,
                    aiohttp.client_exceptions.ClientConnectorError,
                    aio.TimeoutError) as e:
                last_err = e
                await aio.sleep(min(8, 0.8 * attempt))

        raise RuntimeError(f"Edge TTS failed: {last_err}")

    else:
        from elevenlabs.client import ElevenLabs

        key = os.getenv("ELEVEN_API_KEY","").strip()
        if not key:
            raise RuntimeError("Missing ELEVEN_API_KEY")

        if not ELEVEN_VOICE_ID.strip():
            raise RuntimeError("ELEVEN_VOICE_ID is empty. Set it in Step 4.0.")

        client = ElevenLabs(api_key=key)

        last_err = None
        for attempt in range(5):
            try:
                audio = client.text_to_speech.convert(
                    text=text,
                    voice_id=ELEVEN_VOICE_ID,
                    model_id=ELEVEN_MODEL_ID,
                    output_format=ELEVEN_OUTPUT_FORMAT,
                )
                data = audio if isinstance(audio, (bytes, bytearray)) else b"".join(audio)
                out_mp3.write_bytes(data)
                return
            except Exception as e:
                last_err = e
                time.sleep(0.8 * (attempt + 1))

        raise RuntimeError(f"ElevenLabs TTS failed: {last_err}")

def _get_loop():
    try:
        loop = asyncio.get_event_loop()
        if loop.is_closed():
            raise RuntimeError("closed loop")
        return loop
    except Exception:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return loop

def tts_line_to_wav(text: str) -> Path:
    """
    Generate MP3 via provider then convert to WAV (mono, target_sr).
    Cached by provider + voice/model + rate + text.
    """
    if TTS_PROVIDER == "edge":
        meta = f"edge|{EDGE_VOICE}|rate={EDGE_RATE_PERCENT}"
    else:
        meta = f"eleven|voice={ELEVEN_VOICE_ID}|model={ELEVEN_MODEL_ID}|fmt={ELEVEN_OUTPUT_FORMAT}"

    key = stable_hash(meta + "|" + text)

    mp3_path = cache_dir / f"{key}.mp3"
    wav_path = cache_dir / f"{key}_{target_sr}.wav"

    if wav_path.exists() and wav_path.stat().st_size > 0:
        return wav_path

    if mp3_path.exists() and mp3_path.stat().st_size == 0:
        mp3_path.unlink()

    if (not mp3_path.exists()) or mp3_path.stat().st_size == 0:
        loop = _get_loop()
        loop.run_until_complete(tts_to_mp3(text, mp3_path))

    run([
        "ffmpeg","-y","-loglevel","error",
        "-i", str(mp3_path),
        "-ac","1",
        "-ar", str(target_sr),
        "-acodec","pcm_s16le",
        str(wav_path)
    ])
    return wav_path

def fit_to_slot(wav_in: Path, slot_ms: int, out_wav: Path):
    """
    If wav longer -> speed up (cap at max_speedup).
    If wav shorter -> pad silence to reach slot.
    """
    audio = AudioSegment.from_file(str(wav_in))
    dur = len(audio)

    if slot_ms <= 0:
        AudioSegment.silent(duration=0).export(str(out_wav), format="wav")
        return out_wav

    if dur <= 0:
        AudioSegment.silent(duration=slot_ms).export(str(out_wav), format="wav")
        return out_wav

    if dur > slot_ms:
        speed = dur / max(1, slot_ms)
        speed = min(float(speed), float(max_speedup))
        filt = atempo_chain(speed)

        run([
            "ffmpeg","-y","-loglevel","error",
            "-i", str(wav_in),
            "-filter:a", f"{filt},aresample={target_sr}",
            "-ac","1","-ar", str(target_sr),
            str(out_wav)
        ])

        a2 = AudioSegment.from_file(str(out_wav))
        if len(a2) > slot_ms:
            a2 = a2[:slot_ms]
            a2.export(str(out_wav), format="wav")
        return out_wav

    pad = slot_ms - dur
    out = audio + AudioSegment.silent(duration=pad)
    out.export(str(out_wav), format="wav")
    return out_wav

# ----------------------------
# Build timeline
# ----------------------------
ref_seg = AudioSegment.from_file(str(REF_AUDIO))
total_ms = len(ref_seg)

subs = pysrt.open(str(SRT_AR), encoding="utf-8")
timeline = AudioSegment.silent(duration=total_ms, frame_rate=target_sr)

lines_done = 0
total_lines = len(subs)

for s in subs:
    start_ms = ms_from_pysrt_time(s.start)
    end_ms   = ms_from_pysrt_time(s.end)
    slot_ms  = max(0, end_ms - start_ms)

    text = clean_text_ar(s.text)
    if not text:
        continue

    base_wav = tts_line_to_wav(text)

    if TTS_PROVIDER == "edge":
        meta_fit = f"edge|{EDGE_VOICE}|rate={EDGE_RATE_PERCENT}|sr={target_sr}|maxspd={max_speedup}"
    else:
        meta_fit = f"eleven|{ELEVEN_VOICE_ID}|{ELEVEN_MODEL_ID}|{ELEVEN_OUTPUT_FORMAT}|sr={target_sr}|maxspd={max_speedup}"

    fit_key = stable_hash(meta_fit + f"|slot={slot_ms}|" + text)
    fitted = cache_dir / f"{fit_key}_fit.wav"

    if not fitted.exists() or fitted.stat().st_size == 0:
        fit_to_slot(base_wav, slot_ms, fitted)

    clip = AudioSegment.from_file(str(fitted))
    if fade_ms and fade_ms > 0 and len(clip) > (fade_ms * 2):
        clip = clip.fade_in(fade_ms).fade_out(fade_ms)

    if start_ms < total_ms:
        timeline = timeline.overlay(clip, position=max(0, start_ms))

    lines_done += 1
    if lines_done % 25 == 0:
        print(f"Processed lines: {lines_done}/{total_lines}")

timeline = timeline + float(vocal_gain_db)

timeline.export(str(DUB_VOCALS_WAV), format="wav")

# keep your existing key for next steps
if int(target_sr) == 48000:
    step2_paths["dub_vocals_wav_48k"] = str(DUB_VOCALS_WAV)
else:
    step2_paths["dub_vocals_wav_48k"] = str(DUB_VOCALS_WAV)  # still keep same key for compatibility

print("✅ Dub vocals saved:", DUB_VOCALS_WAV)
print("✅ step2_paths['dub_vocals_wav_48k'] =", step2_paths["dub_vocals_wav_48k"])


Using TTS_PROVIDER = elevenlabs
Eleven model/voice = eleven_multilingual_v2 R6nda3uM038xEEKi7GFl
Eleven output fmt  = mp3_44100_128


  m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token)
  m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token)
  elif re.match('(flt)p?( \(default\))?$', token):
  elif re.match('(dbl)p?( \(default\))?$', token):


Processed lines: 25/75
Processed lines: 50/75
Processed lines: 75/75
✅ Dub vocals saved: /content/dub_project/work/05_tts/video_dub_vocals_48000.wav
✅ step2_paths['dub_vocals_wav_48k'] = /content/dub_project/work/05_tts/video_dub_vocals_48000.wav


In [18]:
#@title 4.2) Mix Background + Dub Vocals (Sidechain Ducking + Loudnorm) { display-mode: "form" }

music_gain_db = -2.0 #@param {type:"number"}      # reduce background a bit
duck_threshold = 0.02 #@param {type:"number"}     # sidechain threshold
duck_ratio = 8.0 #@param {type:"number"}          # stronger ducking
duck_attack_ms = 20 #@param {type:"integer"}
duck_release_ms = 250 #@param {type:"integer"}
target_sr = 48000 #@param {type:"integer"}

import subprocess
from pathlib import Path
from pydub import AudioSegment

def run(cmd):
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        print("CMD:", " ".join(map(str, cmd)))
        print("STDERR tail:\n", (p.stderr or "")[-3000:])
        raise RuntimeError("Command failed")
    return p

# require dub vocals
DUB_VOCALS = Path(step2_paths.get("dub_vocals_wav_48k",""))
if not DUB_VOCALS.exists():
    raise FileNotFoundError("Missing dub vocals. Run Step 4.1 first.")

mix_dir = DIRS["work"] / "06_mix"
mix_dir.mkdir(parents=True, exist_ok=True)

video_stem = Path(INPUT_VIDEO).stem
BKG_PREP = mix_dir / f"{video_stem}_background_{target_sr}_stereo.wav"
VOC_PREP = mix_dir / f"{video_stem}_dubvoc_{target_sr}_stereo.wav"
FINAL_WAV = mix_dir / f"{video_stem}_final_mix_{target_sr}.wav"

# locate background wav
bkg = step2_paths.get("background_wav")
bkg_path = Path(bkg) if bkg else None

if not bkg_path or not bkg_path.exists():
    # try build from bass/drums/other if available
    demucs_root = Path(step2_paths.get("demucs_out_root",""))
    bass = next(iter(sorted(demucs_root.glob("**/bass.wav"))), None) if demucs_root.exists() else None
    drums = next(iter(sorted(demucs_root.glob("**/drums.wav"))), None) if demucs_root.exists() else None
    other = next(iter(sorted(demucs_root.glob("**/other.wav"))), None) if demucs_root.exists() else None

    if bass and drums and other:
        tmp_bkg = mix_dir / f"{video_stem}_background_built.wav"
        # mix the 3 stems
        run([
            "ffmpeg","-y","-loglevel","error",
            "-i", str(bass), "-i", str(drums), "-i", str(other),
            "-filter_complex", "amix=inputs=3:duration=longest:dropout_transition=0",
            str(tmp_bkg)
        ])
        bkg_path = tmp_bkg
    else:
        # fallback: use original extracted raw audio (still works but less clean)
        raw = step2_paths.get("audio_raw") or step2_paths.get("audio_16k_mono")
        if not raw:
            raise RuntimeError("No background and no raw audio fallback found.")
        bkg_path = Path(raw)

print("Background source:", bkg_path)

# prep background to stereo target_sr + gain
run([
    "ffmpeg","-y","-loglevel","error",
    "-i", str(bkg_path),
    "-filter:a", f"volume={music_gain_db}dB,aresample={target_sr},pan=stereo|c0=c0|c1=c0",
    str(BKG_PREP)
])

# prep vocals to stereo target_sr
run([
    "ffmpeg","-y","-loglevel","error",
    "-i", str(DUB_VOCALS),
    "-filter:a", f"aresample={target_sr},pan=stereo|c0=c0|c1=c0",
    str(VOC_PREP)
])

# mix with sidechain ducking + loudnorm
# Note: loudnorm single-pass is fine for most cases; if you want perfect EBU, do 2-pass.
fc = (
    f"[0:a][1:a]sidechaincompress="
    f"threshold={duck_threshold}:ratio={duck_ratio}:attack={duck_attack_ms}:release={duck_release_ms}"
    f"[ducked];"
    f"[ducked][1:a]amix=inputs=2:duration=longest:dropout_transition=0,"
    f"loudnorm=I=-16:TP=-1.5:LRA=11"
    f"[mix]"
)

run([
    "ffmpeg","-y","-loglevel","error",
    "-i", str(BKG_PREP),
    "-i", str(VOC_PREP),
    "-filter_complex", fc,
    "-map", "[mix]",
    str(FINAL_WAV)
])

step2_paths["final_mix_wav"] = str(FINAL_WAV)
print("Final mix saved:", FINAL_WAV)
print("Saved in step2_paths['final_mix_wav']")


Background source: /content/dub_project/work/01_demucs/htdemucs/video_raw/no_vocals.wav
Final mix saved: /content/dub_project/work/06_mix/video_final_mix_48000.wav
Saved in step2_paths['final_mix_wav']


In [19]:
# Fix Arabic RTL in SRT (handle mixed EN/AR)
from pathlib import Path
import re
import pysrt

in_path = Path(step2_paths.get("srt_ar_dub",""))
if not in_path.exists():
    raise FileNotFoundError(f"Missing srt_ar_dub: {in_path}")

out_path = in_path.with_name(in_path.stem + "_rtl_fixed.srt")

# Direction marks
RLM = "\u200F"   # Right-to-Left Mark
LRM = "\u200E"   # Left-to-Right Mark
FSI = "\u2068"   # First Strong Isolate
PDI = "\u2069"   # Pop Directional Isolate

# Latin-ish tokens (English words, acronyms, numbers, urls)
latin = re.compile(r"([A-Za-z0-9][A-Za-z0-9._:/@#\-+]*[A-Za-z0-9]?)")

def fix_line(t: str) -> str:
    t = (t or "").strip()
    if not t:
        return t
    t = re.sub(r"\s+", " ", t)

    # wrap any Latin token as isolated LTR
    t = latin.sub(lambda m: f"{FSI}{LRM}{m.group(1)}{PDI}", t)

    # force the line overall to RTL
    return RLM + t

subs = pysrt.open(str(in_path), encoding="utf-8")
for s in subs:
    lines = (s.text or "").splitlines()
    s.text = "\n".join(fix_line(x) for x in lines)

subs.save(str(out_path), encoding="utf-8")
step2_paths["srt_ar_dub_rtl_fixed"] = str(out_path)

print("✅ Saved:", out_path)
print("Use this:", step2_paths["srt_ar_dub_rtl_fixed"])


✅ Saved: /content/dub_project/work/04_translate/video_ar_dub_rtl_fixed.srt
Use this: /content/dub_project/work/04_translate/video_ar_dub_rtl_fixed.srt


In [20]:
#@title 4.3) Render Final Dubbed Video (MP4) { display-mode: "form" }

add_soft_subtitles = True #@param {type:"boolean"}  # attach Arabic SRT as soft subs
use_ar_srt = True #@param {type:"boolean"}          # attach your arabic dub srt (not the English one)

import subprocess
from pathlib import Path

def run(cmd):
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        print("CMD:", " ".join(map(str, cmd)))
        print("STDERR tail:\n", (p.stderr or "")[-3000:])
        raise RuntimeError("Command failed")
    return p

# locate video without audio
video_no_audio = None
if "outputs" in globals() and isinstance(outputs, dict) and outputs.get("video_no_audio"):
    video_no_audio = Path(outputs["video_no_audio"])
else:
    # fallback: search in extract dir
    cands = sorted((DIRS["extract"]).glob("*_no_audio.mp4"))
    video_no_audio = cands[-1] if cands else None

if not video_no_audio or not video_no_audio.exists():
    raise FileNotFoundError("video_no_audio not found. Run Step 1.2 first.")

FINAL_WAV = Path(step2_paths.get("final_mix_wav",""))
if not FINAL_WAV.exists():
    raise FileNotFoundError("final_mix_wav not found. Run Step 4.2 first.")

out_dir = DIRS["output"]
out_dir.mkdir(parents=True, exist_ok=True)

video_stem = Path(INPUT_VIDEO).stem
OUT_MP4 = out_dir / f"{video_stem}_dubbed.mp4"

cmd = [
    "ffmpeg","-y",
    "-i", str(video_no_audio),
    "-i", str(FINAL_WAV),
]

maps = ["-map","0:v:0","-map","1:a:0"]

# optional subtitles
if add_soft_subtitles:
    srt_path = None
    if use_ar_srt:
        srt_path = Path(step2_paths.get("srt_ar_dub_rtl_fixed", step2_paths.get("srt_ar_dub","")))
    else:
        srt_path = Path(step2_paths.get("asr_srt_en",""))

    if srt_path and srt_path.exists():
        cmd += ["-i", str(srt_path)]
        maps += ["-map","2:s:0"]
    else:
        print("Warning: subtitle file not found; continuing without subtitles.")

cmd += maps

cmd += [
    "-c:v","copy",
    "-c:a","aac","-b:a","192k",
]

if add_soft_subtitles:
    # mov_text works in mp4; if you see Arabic rendering issues, export MKV instead.
    cmd += ["-c:s","mov_text"]

cmd += ["-shortest", str(OUT_MP4)]

run(cmd)

step2_paths["final_video_mp4"] = str(OUT_MP4)
print("Final video:", OUT_MP4)
print("Saved in step2_paths['final_video_mp4']")


Final video: /content/dub_project/output/video_dubbed.mp4
Saved in step2_paths['final_video_mp4']


In [21]:
#@title 4.4) Copy to Drive + Download { display-mode: "form" }

copy_to_drive = True #@param {type:"boolean"}
auto_download = True #@param {type:"boolean"}

from pathlib import Path
import shutil

final_video = Path(step2_paths.get("final_video_mp4",""))
final_mix   = Path(step2_paths.get("final_mix_wav",""))
ar_srt = Path(step2_paths.get("srt_ar_dub_rtl_fixed", step2_paths.get("srt_ar_dub","")))

print("Artifacts:")
print(" - video:", final_video)
print(" - mix  :", final_mix)
print(" - srt  :", ar_srt)

if copy_to_drive:
    try:
        drive_upload_path
    except NameError:
        drive_upload_path = Path("/content/gdrive/MyDrive") / "dub_project"
        drive_upload_path.mkdir(parents=True, exist_ok=True)

    for f in [final_video, final_mix, ar_srt]:
        if f.exists():
            shutil.copy(str(f), str(drive_upload_path / f.name))
            print("Copied:", drive_upload_path / f.name)

if auto_download:
    from google.colab import files
    if final_video.exists(): files.download(str(final_video))
    if ar_srt.exists(): files.download(str(ar_srt))


Artifacts:
 - video: /content/dub_project/output/video_dubbed.mp4
 - mix  : /content/dub_project/work/06_mix/video_final_mix_48000.wav
 - srt  : /content/dub_project/work/04_translate/video_ar_dub_rtl_fixed.srt
Copied: /content/gdrive/MyDrive/dub_project/video_dubbed.mp4
Copied: /content/gdrive/MyDrive/dub_project/video_final_mix_48000.wav
Copied: /content/gdrive/MyDrive/dub_project/video_ar_dub_rtl_fixed.srt


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>