# Video Dubbing Service V2 - Google Colab Edition

This notebook allows you to run the video dubbing service on Google Colab with GPU support.

## Setup Instructions:
1. Upload this notebook to Google Colab
2. Upload your `v2` project folder to Colab (or clone from repository)
3. Mount Google Drive (optional - for saving files)
4. Upload your video files to `/content/videos/` (or use the upload widget)
5. Run all cells sequentially
6. Process your videos!

In [None]:
# Install required dependencies
!pip install -q torch torchaudio transformers openai-whisper demucs moviepy pydub pyrubberband python-dotenv TTS

# Install system dependencies for audio processing
!apt-get update -qq
!apt-get install -y -qq ffmpeg sox

print("‚úÖ Dependencies installed successfully!")

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[31mERROR: Ignored the following versions that require a different python version: 0.0.10.2 Requires-Python >=3.6.0, <3.9; 0.0.10.3 Requires-Python >=3.6.0, <3.9; 0.0.11 Requires-Python >=3.6.0, <3.9; 0.0.12 Requires-Python >=3.6.0, <3.9; 0.0.13.1 Requires-Python >=3.6.0, <3.9; 0.0.13.2 Requires-Python >=3.6.0, <3.9; 0.0.14.1 Requires-Python >=3.6.0, <3.9; 0.0.15 Requires-Python >=3.6.0, <3.9; 0.0.15.1 Requires-Python >=3.6.0, <3.9; 0.0.9 Requires-Python >=3.6.0, <3.9; 0.0.9.1 Requires-Python >=3.6.0, <3.9; 0.0.9.2 Requires-Python >=3.6.0, <3.9; 0.0.9a10 Requires-Python >=3.6.0, <3.9; 0.0.9a9 Requires-Python >=3.6.0, <3.9; 0.1.0 Requires-Python >=3.6.0, <3.10; 0.1.1 Requires-Python >=3.6.0, <3.10; 0.1.2 Requires-Python >=3.6.0, <3.10; 0.1.3 Requires-Python >=3.6.0,

In [None]:
# Setup environment and paths
import os
from pathlib import Path

# Mount Google Drive (optional - if you want to save files there)
from google.colab import drive
drive.mount('/content/drive')

# Set working directory to /content
WORK_DIR = Path('/content')
os.chdir(WORK_DIR)

print(f"‚úÖ Environment setup complete")
print(f"Current working directory: {os.getcwd()}")

‚ùå Error: Could not find v2 directory!
   Current directory: /content
   Please ensure you're running from the project directory
   Expected structure: .../Dubbing-Service/v2/services/


In [None]:
# Import required libraries and setup logging
import asyncio
import logging
import os
import sys
import tempfile
import uuid
import io
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional
import numpy as np

# Audio/Video processing
from moviepy import VideoFileClip, AudioFileClip
from pydub import AudioSegment
import soundfile as sf

# ML/AI libraries
import torch
import torchaudio
try:
    import whisper
    WHISPER_AVAILABLE = True
except ImportError:
    WHISPER_AVAILABLE = False

try:
    from demucs import pretrained
    from demucs.apply import apply_model
    DEMUCS_AVAILABLE = True
except ImportError:
    DEMUCS_AVAILABLE = False

try:
    from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoProcessor, AutoModel
    TRANSFORMERS_AVAILABLE = True
except ImportError:
    TRANSFORMERS_AVAILABLE = False

try:
    from TTS.api import TTS as CoquiTTS
    COQUI_TTS_AVAILABLE = True
except ImportError:
    COQUI_TTS_AVAILABLE = False

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("‚úÖ All libraries imported successfully!")

üìã Python path includes: []
‚ùå Import error: No module named 'v2'

üîß Troubleshooting:
   1. Make sure Cell 2 ran successfully
   2. Check that sys.path includes the project root directory
   3. Verify v2/services/ directory exists with all required files

   Current sys.path entries related to project:


ModuleNotFoundError: No module named 'v2'

## Helper Classes and Functions

The cells below contain all necessary service classes and functions for video dubbing.

In [None]:
# Translation Tools - NLLB Translator
NLLB_LANGUAGE_MAP = {
    'en': 'eng_Latn', 'ha': 'hau_Latn', 'ig': 'ibo_Latn', 'yo': 'yor_Latn',
    'fr': 'fra_Latn', 'es': 'spa_Latn', 'de': 'deu_Latn', 'ru': 'rus_Cyrl',
    'zh': 'zho_Hans', 'sw': 'swh_Latn',
}
NLLB_MODEL_NAME = "facebook/nllb-200-distilled-600M"

class NLLBTranslator:
    _model = None
    _tokenizer = None
    _device = None
    
    @classmethod
    def _get_device(cls):
        if cls._device:
            return cls._device
        if torch.cuda.is_available():
            cls._device = torch.device("cuda")
        else:
            cls._device = torch.device("cpu")
        return cls._device
    
    @classmethod
    def _load_model(cls):
        if cls._model is not None:
            return cls._model, cls._tokenizer
        if not TRANSFORMERS_AVAILABLE:
            raise ImportError("transformers not available")
        logger.info(f"Loading NLLB model: {NLLB_MODEL_NAME}")
        device = cls._get_device()
        cls._tokenizer = AutoTokenizer.from_pretrained(NLLB_MODEL_NAME, src_lang="eng_Latn")
        cls._model = AutoModelForSeq2SeqLM.from_pretrained(NLLB_MODEL_NAME).to(device)
        cls._model.eval()
        return cls._model, cls._tokenizer
    
    @classmethod
    def translate(cls, text: str, source_language: str, target_language: str, max_length: int = 512) -> str:
        if not text or not text.strip():
            return text
        model, tokenizer = cls._load_model()
        device = cls._get_device()
        src_lang_code = NLLB_LANGUAGE_MAP.get(source_language.lower(), 'eng_Latn')
        tgt_lang_code = NLLB_LANGUAGE_MAP.get(target_language.lower(), 'eng_Latn')
        tokenizer.src_lang = src_lang_code
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=max_length)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            generated_tokens = model.generate(**inputs, forced_bos_token_id=tokenizer.lang_code_to_id[tgt_lang_code], max_length=max_length, num_beams=4, early_stopping=True)
        translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
        return translated_text.strip()

print("‚úÖ Translation tools loaded")

In [None]:
# TTS Tools - Coqui XTTS and helpers
TTS_LANGUAGE_MAP = {'en': 'en', 'ha': 'ha', 'ig': 'ig', 'yo': 'yo', 'fr': 'fr', 'es': 'es', 'de': 'de', 'ru': 'ru', 'zh': 'zh', 'sw': 'sw'}
COQUI_XTTS_MODEL = "tts_models/multilingual/multi-dataset/xtts_v2"

class CoquiXTTS:
    _tts_model = None
    _device = None
    
    @classmethod
    def _get_device(cls):
        if cls._device:
            return cls._device
        cls._device = "cuda" if torch.cuda.is_available() else "cpu"
        return cls._device
    
    @classmethod
    def _load_model(cls):
        if cls._tts_model is not None:
            return cls._tts_model
        if not COQUI_TTS_AVAILABLE:
            raise ImportError("Coqui TTS not available")
        logger.info("Loading Coqui XTTS-v2 model")
        cls._tts_model = CoquiTTS(model_name=COQUI_XTTS_MODEL, progress_bar=False)
        cls._tts_model.to(cls._get_device())
        return cls._tts_model
    
    @classmethod
    def synthesize(cls, text: str, language: str, speaker_wav: Optional[str] = None) -> AudioSegment:
        if not text or not text.strip():
            raise ValueError("Empty text")
        tts_model = cls._load_model()
        lang_code = TTS_LANGUAGE_MAP.get(language.lower(), 'en')
        output_path = os.path.join(tempfile.gettempdir(), f"temp_tts_{hash(text) % 10000}.wav")
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        try:
            if speaker_wav and os.path.exists(speaker_wav):
                tts_model.tts_to_file(text=text, file_path=output_path, language=lang_code, speaker_wav=speaker_wav)
            else:
                try:
                    tts_model.tts_to_file(text=text, file_path=output_path, language=lang_code)
                except:
                    tts_model.tts_to_file(text=text, file_path=output_path)
            audio = AudioSegment.from_wav(output_path)
            try:
                os.remove(output_path)
            except:
                pass
            return audio
        except Exception as e:
            logger.error(f"Coqui TTS failed: {e}")
            raise

def generate_tts_huggingface(text: str, language: str, prefer_coqui: bool = True) -> AudioSegment:
    lang = language.lower()
    if prefer_coqui and COQUI_TTS_AVAILABLE:
        return CoquiXTTS.synthesize(text, language)
    else:
        raise ImportError("No TTS available. Install: pip install TTS")

print("‚úÖ TTS tools loaded")

In [None]:
# Service Classes - All service processors inline

class VideoProcessor:
    def __init__(self, job_id: str):
        self.job_id = job_id
        self._demucs_model = None
    
    async def extract_audio(self, video_path: str) -> str:
        logger.info(f"[JOB {self.job_id}] Extracting audio")
        output_dir = os.path.join(tempfile.gettempdir(), "v2_dubbing", f"job_{self.job_id}")
        os.makedirs(output_dir, exist_ok=True)
        video = VideoFileClip(video_path)
        audio = video.audio
        if audio is None:
            raise Exception("No audio track found")
        audio_path = os.path.join(output_dir, "original_audio.wav")
        audio.write_audiofile(audio_path, fps=22050, nbytes=2, logger=None)
        test_audio = AudioSegment.from_file(audio_path)
        if test_audio.channels == 1:
            stereo_audio = test_audio.set_channels(2)
            stereo_audio.export(audio_path, format="wav")
        audio.close()
        video.close()
        return audio_path
    
    async def separate_audio(self, audio_path: str) -> Dict[str, str]:
        if not DEMUCS_AVAILABLE:
            raise Exception("Demucs not available")
        if self._demucs_model is None:
            self._demucs_model = pretrained.get_model('htdemucs')
            self._demucs_model.eval()
        wav, sr = torchaudio.load(audio_path)
        if sr != self._demucs_model.samplerate:
            wav = torchaudio.functional.resample(wav, sr, self._demucs_model.samplerate)
            sr = self._demucs_model.samplerate
        if wav.dim() == 2:
            wav = wav.unsqueeze(0)
        elif wav.dim() == 1:
            wav = wav.unsqueeze(0).unsqueeze(0)
        with torch.no_grad():
            sources = apply_model(self._demucs_model, wav, device='cpu', progress=True)
        source_names = ['drums', 'bass', 'other', 'vocals']
        separated_files = {}
        output_dir = os.path.dirname(audio_path)
        for i, name in enumerate(source_names):
            output_path = os.path.join(output_dir, f"{name}.wav")
            source_audio = sources[0, i].cpu().numpy()
            if len(source_audio.shape) == 1:
                source_audio = np.stack([source_audio, source_audio])
            sf.write(output_path, source_audio.T, sr)
            separated_files[name] = output_path
        return separated_files
    
    async def create_background(self, separated_files: Dict[str, str]) -> str:
        output_dir = os.path.dirname(separated_files['drums'])
        background_path = os.path.join(output_dir, "background.wav")
        drums, sr = sf.read(separated_files['drums'])
        bass, _ = sf.read(separated_files['bass'])
        other, _ = sf.read(separated_files['other'])
        background = drums + bass + other
        sf.write(background_path, background, sr)
        return background_path
    
    async def replace_audio(self, video_path: str, audio_path: str) -> str:
        output_dir = os.path.join(tempfile.gettempdir(), "v2_dubbing", f"job_{self.job_id}")
        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, "dubbed_video.mp4")
        video = VideoFileClip(video_path)
        audio_clip = AudioFileClip(audio_path)
        if audio_clip.duration < video.duration:
            audio_clip = audio_clip.set_duration(video.duration)
        elif audio_clip.duration > video.duration:
            audio_clip = audio_clip.subclipped(0, video.duration)
        final_video = video.with_audio(audio_clip)
        final_video.write_videofile(output_path, codec='libx264', audio_codec='aac', logger=None)
        video.close()
        audio_clip.close()
        final_video.close()
        return output_path

class TranscriptionProcessor:
    def __init__(self, job_id: str):
        self.job_id = job_id
        self._whisper_model = None
    
    async def transcribe(self, audio_path: str, language: str) -> List[Dict[str, Any]]:
        if not WHISPER_AVAILABLE:
            raise Exception("Whisper not available")
        if self._whisper_model is None:
            self._whisper_model = whisper.load_model("base")
        result = self._whisper_model.transcribe(audio_path, language=language if language != "en" else None, task="transcribe", fp16=False)
        segments = []
        for seg in result.get("segments", []):
            segments.append({"start_time": seg.get("start", 0), "end_time": seg.get("end", 0), "duration": seg.get("end", 0) - seg.get("start", 0), "transcription": seg.get("text", "").strip()})
        return segments

class TranslationProcessor:
    def __init__(self, job_id: str):
        self.job_id = job_id
    
    async def translate(self, segments: List[Dict[str, Any]], source_language: str, target_language: str) -> List[Dict[str, Any]]:
        translated_segments = []
        for seg in segments:
            text = seg.get("transcription", "")
            if not text:
                seg["translated_text"] = ""
                translated_segments.append(seg)
                continue
            if target_language in ['yo', 'ig', 'ha'] and TRANSFORMERS_AVAILABLE:
                try:
                    translated_text = NLLBTranslator.translate(text, source_language, target_language)
                except:
                    translated_text = text
            else:
                translated_text = text
            seg["translated_text"] = translated_text
            translated_segments.append(seg)
        return translated_segments

class TTSProcessor:
    def __init__(self, job_id: str):
        self.job_id = job_id
    
    async def generate_tts(self, segments: List[Dict[str, Any]], target_language: str) -> List[Dict[str, Any]]:
        tts_segments = []
        for i, seg in enumerate(segments):
            text = seg.get("translated_text", "")
            if not text:
                seg["audio"] = AudioSegment.silent(duration=int(seg.get("duration", 0) * 1000))
                tts_segments.append(seg)
                continue
            if target_language in ['yo', 'ig', 'ha'] and COQUI_TTS_AVAILABLE:
                try:
                    audio = generate_tts_huggingface(text, target_language, prefer_coqui=True)
                except:
                    audio = AudioSegment.silent(duration=int(seg.get("duration", 0) * 1000))
            elif COQUI_TTS_AVAILABLE:
                try:
                    audio = CoquiXTTS.synthesize(text, target_language)
                except:
                    audio = AudioSegment.silent(duration=int(seg.get("duration", 0) * 1000))
            else:
                audio = AudioSegment.silent(duration=int(seg.get("duration", 0) * 1000))
            seg["audio"] = audio
            tts_segments.append(seg)
        return tts_segments

class AudioAssembler:
    def __init__(self, job_id: str):
        self.job_id = job_id
        self.MIN_PAUSE_BETWEEN_SEGMENTS = 0.3
        self.IDEAL_PAUSE_BETWEEN_SEGMENTS = 0.5
        self.FADE_IN_DURATION = 50
        self.FADE_OUT_DURATION = 100
    
    def _add_fade_effects(self, audio: AudioSegment) -> AudioSegment:
        return audio.fade_in(self.FADE_IN_DURATION).fade_out(self.FADE_OUT_DURATION)
    
    def _add_natural_pause(self, audio: AudioSegment, pause_duration_ms: int) -> AudioSegment:
        if pause_duration_ms > 0:
            return audio + AudioSegment.silent(duration=pause_duration_ms)
        return audio
    
    def _calculate_non_overlapping_positions(self, tts_segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        positioned_segments = []
        current_end_time = 0.0
        for i, seg in enumerate(tts_segments):
            if "audio" not in seg:
                positioned_segments.append(seg)
                continue
            audio = seg["audio"]
            audio_duration = len(audio) / 1000.0
            original_start = seg.get("start_time", 0)
            if i == 0:
                final_start = max(0.0, original_start)
            else:
                min_start = current_end_time + self.MIN_PAUSE_BETWEEN_SEGMENTS
                final_start = max(original_start, min_start)
            pause_duration_ms = int(self.IDEAL_PAUSE_BETWEEN_SEGMENTS * 1000)
            audio_with_pause = self._add_natural_pause(audio, pause_duration_ms)
            seg_copy = seg.copy()
            seg_copy["audio"] = audio_with_pause
            seg_copy["final_start_time"] = final_start
            seg_copy["final_end_time"] = final_start + (len(audio_with_pause) / 1000.0)
            positioned_segments.append(seg_copy)
            current_end_time = seg_copy["final_end_time"]
        return positioned_segments
    
    async def assemble(self, tts_segments: List[Dict[str, Any]], background_path: str) -> str:
        background = AudioSegment.from_file(background_path)
        background_duration = len(background) / 1000.0
        positioned_segments = self._calculate_non_overlapping_positions(tts_segments)
        max_end_time = 0.0
        for seg in positioned_segments:
            end_time = seg.get("final_end_time", seg.get("end_time", 0))
            max_end_time = max(max_end_time, end_time)
        total_duration = max(background_duration, max_end_time)
        total_duration_ms = int(total_duration * 1000)
        assembled_audio = AudioSegment.silent(duration=total_duration_ms)
        assembled_audio = assembled_audio.overlay(background, gain_during_overlay=-6)
        for seg in positioned_segments:
            if "audio" not in seg:
                continue
            audio = seg["audio"]
            final_start = seg.get("final_start_time", seg.get("start_time", 0))
            audio = self._add_fade_effects(audio)
            start_ms = int(final_start * 1000)
            if start_ms + len(audio) > total_duration_ms:
                max_audio_length = total_duration_ms - start_ms
                if max_audio_length > 0:
                    audio = audio[:max_audio_length]
                else:
                    continue
            assembled_audio = assembled_audio.overlay(audio, position=start_ms, gain_during_overlay=+3)
        output_dir = os.path.join(tempfile.gettempdir(), "v2_dubbing", f"job_{self.job_id}")
        os.makedirs(output_dir, exist_ok=True)
        final_audio_path = os.path.join(output_dir, "final_audio.mp3")
        assembled_audio.export(final_audio_path, format="mp3", bitrate="320k")
        return final_audio_path

print("‚úÖ Service classes loaded")

In [None]:
# DubbingProcessor - Main processor class
class DubbingProcessor:
    def __init__(self):
        self.job_id = f"job_{uuid.uuid4().hex[:12]}"
        self.temp_files = []
        
    async def process_video(self, video_path: str, source_language: str, target_language: str) -> Dict[str, Any]:
        try:
            logger.info(f"[JOB {self.job_id}] Starting dubbing: {video_path}")
            logger.info(f"[JOB {self.job_id}] Languages: {source_language} ‚Üí {target_language}")
            
            # Phase 1: Extract audio
            logger.info(f"[JOB {self.job_id}] Phase 1: Extracting audio")
            video_processor = VideoProcessor(self.job_id)
            audio_path = await video_processor.extract_audio(video_path)
            self.temp_files.append(audio_path)
            
            # Phase 2: Separate audio
            logger.info(f"[JOB {self.job_id}] Phase 2: Separating audio")
            separated_files = await video_processor.separate_audio(audio_path)
            self.temp_files.extend(separated_files.values())
            
            # Phase 3: Create background
            logger.info(f"[JOB {self.job_id}] Phase 3: Creating background")
            background_path = await video_processor.create_background(separated_files)
            self.temp_files.append(background_path)
            
            # Phase 4: Transcribe
            logger.info(f"[JOB {self.job_id}] Phase 4: Transcribing")
            transcription_processor = TranscriptionProcessor(self.job_id)
            segments = await transcription_processor.transcribe(separated_files['vocals'], source_language)
            
            # Phase 5: Translate
            logger.info(f"[JOB {self.job_id}] Phase 5: Translating")
            translation_processor = TranslationProcessor(self.job_id)
            translated_segments = await translation_processor.translate(segments, source_language, target_language)
            
            # Phase 6: Generate TTS
            logger.info(f"[JOB {self.job_id}] Phase 6: Generating TTS")
            tts_processor = TTSProcessor(self.job_id)
            tts_segments = await tts_processor.generate_tts(translated_segments, target_language)
            
            # Phase 7: Assemble audio
            logger.info(f"[JOB {self.job_id}] Phase 7: Assembling audio")
            assembler = AudioAssembler(self.job_id)
            final_audio_path = await assembler.assemble(tts_segments, background_path)
            self.temp_files.append(final_audio_path)
            
            # Phase 8: Replace video audio
            logger.info(f"[JOB {self.job_id}] Phase 8: Replacing video audio")
            output_path = await video_processor.replace_audio(video_path, final_audio_path)
            
            logger.info(f"[JOB {self.job_id}] ‚úÖ Dubbing complete: {output_path}")
            return {"success": True, "output_path": output_path, "job_id": self.job_id}
            
        except Exception as e:
            logger.error(f"[JOB {self.job_id}] ‚ùå Dubbing failed: {e}", exc_info=True)
            return {"success": False, "error": str(e), "job_id": self.job_id}

print("‚úÖ DubbingProcessor loaded")

In [None]:
async def process_video_directory(
    video_directory: str,
    source_language: str,
    target_language: str,
    output_directory: str = None
):
    """
    Process all videos in a directory through the dubbing pipeline
    
    Args:
        video_directory: Path to directory containing video files
        source_language: Source language code (e.g., 'en', 'fr', 'es')
        target_language: Target language code (e.g., 'yo', 'ig', 'ha')
        output_directory: Optional output directory (defaults to video_directory/dubbed)
    
    Returns:
        List of results for each processed video
    """
    video_dir = Path(video_directory)
    if not video_dir.exists():
        raise ValueError(f"Video directory not found: {video_directory}")
    
    # Set output directory
    if output_directory is None:
        output_dir = video_dir / "dubbed"
    else:
        output_dir = Path(output_directory)
    
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Find all video files
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v']
    video_files = []
    for ext in video_extensions:
        video_files.extend(video_dir.glob(f"*{ext}"))
        video_files.extend(video_dir.glob(f"*{ext.upper()}"))
    
    if not video_files:
        logger.warning(f"No video files found in {video_directory}")
        return []
    
    logger.info(f"Found {len(video_files)} video file(s) to process")
    
    results = []
    
    # Process each video
    for video_path in video_files:
        logger.info(f"\n{'='*60}")
        logger.info(f"Processing: {video_path.name}")
        logger.info(f"{'='*60}")
        
        try:
            # Create dubbing processor
            processor = DubbingProcessor()
            
            # Process video
            result = await processor.process_video(
                str(video_path),
                source_language,
                target_language
            )
            
            if result['success']:
                # Move output to designated output directory
                output_path = Path(result['output_path'])
                final_output = output_dir / f"{video_path.stem}_dubbed{video_path.suffix}"
                
                # Copy or move the file
                import shutil
                shutil.copy2(output_path, final_output)
                logger.info(f"‚úÖ Saved dubbed video to: {final_output}")
                
                result['final_output_path'] = str(final_output)
                results.append(result)
            else:
                logger.error(f"‚ùå Failed to process {video_path.name}: {result.get('error', 'Unknown error')}")
                results.append(result)
                
        except Exception as e:
            logger.error(f"‚ùå Error processing {video_path.name}: {e}", exc_info=True)
            results.append({
                'success': False,
                'video': str(video_path),
                'error': str(e)
            })
    
    logger.info(f"\n{'='*60}")
    logger.info(f"Processing complete! Processed {len([r for r in results if r.get('success')])}/{len(video_files)} videos")
    logger.info(f"Output directory: {output_dir}")
    logger.info(f"{'='*60}")
    
    return results


def process_videos(
    video_directory: str,
    source_language: str,
    target_language: str,
    output_directory: str = None
):
    """
    Synchronous wrapper for process_video_directory
    
    Use this function directly in Colab cells
    """
    # Run async function
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        results = loop.run_until_complete(
            process_video_directory(
                video_directory,
                source_language,
                target_language,
                output_directory
            )
        )
        return results
    finally:
        loop.close()


print("‚úÖ Dubbing functions defined successfully!")

In [None]:
# CONFIGURATION - Adjust these parameters

# Path to directory containing your video files
VIDEO_DIRECTORY = "/content/videos"  # Change this to your video directory path

# Source language (language spoken in the video)
SOURCE_LANGUAGE = "en"  # Options: 'en', 'fr', 'es', 'de', 'ru', 'zh', etc.

# Target language (language you want to dub to)
TARGET_LANGUAGE = "yo"  # Options: 'yo' (Yoruba), 'ig' (Igbo), 'ha' (Hausa), etc.

# Output directory (optional - will create 'dubbed' subdirectory if not specified)
OUTPUT_DIRECTORY = None  # e.g., "/content/drive/MyDrive/dubbed_videos"

print(f"Configuration:")
print(f"  Video Directory: {VIDEO_DIRECTORY}")
print(f"  Source Language: {SOURCE_LANGUAGE}")
print(f"  Target Language: {TARGET_LANGUAGE}")
print(f"  Output Directory: {OUTPUT_DIRECTORY or 'Auto (dubbed subdirectory)'}")

In [None]:
# Create video directory if it doesn't exist
os.makedirs(VIDEO_DIRECTORY, exist_ok=True)
print(f"üìÅ Video directory ready: {VIDEO_DIRECTORY}")

# List existing video files
video_files = []
if os.path.exists(VIDEO_DIRECTORY):
    video_files = [f for f in os.listdir(VIDEO_DIRECTORY) 
                   if f.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.m4v'))]

if video_files:
    print(f"üìπ Found {len(video_files)} video file(s) in {VIDEO_DIRECTORY}:")
    for vf in video_files[:5]:  # Show first 5
        print(f"  - {vf}")
    if len(video_files) > 5:
        print(f"  ... and {len(video_files) - 5} more")
else:
    print(f"‚ö†Ô∏è  No video files found in {VIDEO_DIRECTORY}")
    print(f"\nüì§ To upload videos, you can:")
    print(f"   1. Use the file upload widget in the next cell")
    print(f"   2. Drag and drop files to the Colab file browser sidebar")
    print(f"   3. Upload to Google Drive and access via mounted drive")

In [None]:
# Upload video files (optional - if you haven't uploaded them yet)
# Uncomment the code below to use the file upload widget

from google.colab import files
import shutil

print("üì§ Upload your video files:")
print("   Click 'Choose Files' and select your video files")
print("   Files will be saved to:", VIDEO_DIRECTORY)

# Uncomment the next two lines to enable file upload:
# uploaded = files.upload()
# for filename in uploaded.keys():
#     shutil.move(filename, os.path.join(VIDEO_DIRECTORY, filename))
#     print(f"‚úÖ Saved: {filename}")

print("\nüí° Tip: You can also drag and drop files directly in the Colab file browser!")

In [None]:
# Run the dubbing process
# This will process all videos in the specified directory

print("üöÄ Starting dubbing process...")
print("This may take a while, especially for the first run (model downloads)")
print("-" * 60)

results = process_videos(
    video_directory=VIDEO_DIRECTORY,
    source_language=SOURCE_LANGUAGE,
    target_language=TARGET_LANGUAGE,
    output_directory=OUTPUT_DIRECTORY
)

# Display results summary
print("\n" + "="*60)
print("PROCESSING SUMMARY")
print("="*60)

for i, result in enumerate(results, 1):
    status = "‚úÖ SUCCESS" if result.get('success') else "‚ùå FAILED"
    video_name = result.get('video', 'Unknown')
    if isinstance(video_name, str):
        video_name = Path(video_name).name
    print(f"{i}. {video_name}: {status}")
    if result.get('success') and 'final_output_path' in result:
        print(f"   Output: {result['final_output_path']}")
    elif 'error' in result:
        print(f"   Error: {result['error']}")

print("="*60)

## Download Results

After processing, you can download the dubbed videos from Colab or access them in your Google Drive (if you mounted it).

In [None]:
# Download all dubbed videos as a ZIP file
from zipfile import ZipFile
import shutil
from google.colab import files

# Find output directory
if OUTPUT_DIRECTORY:
    output_dir = Path(OUTPUT_DIRECTORY)
else:
    output_dir = Path(VIDEO_DIRECTORY) / "dubbed"

if output_dir.exists():
    zip_path = Path("/content/dubbed_videos.zip")
    
    # Create ZIP file
    with ZipFile(zip_path, 'w') as zipf:
        for video_file in output_dir.glob("*_dubbed.*"):
            zipf.write(video_file, video_file.name)
    
    print(f"‚úÖ Created ZIP file: {zip_path}")
    print(f"   Contains {len(list(output_dir.glob('*_dubbed.*')))} dubbed video(s)")
    print(f"\nüì• Downloading the ZIP file...")
    
    # Download the ZIP file
    files.download(str(zip_path))
else:
    print(f"‚ö†Ô∏è  Output directory not found: {output_dir}")

## Additional Help

### Supported Languages

**Source Languages:** Any language supported by Whisper (e.g., 'en', 'fr', 'es', 'de', 'ru', 'zh', 'ja', 'ko', etc.)

**Target Languages:**
- **Nigerian Languages:** 'yo' (Yoruba), 'ig' (Igbo), 'ha' (Hausa)
- **Other Languages:** 'en', 'fr', 'es', 'de', 'ru', 'zh', 'sw', etc.

### Notes:
- First run will download models (NLLB-200 ~2.5GB, Whisper, Demucs)
- Processing time depends on video length and complexity
- GPU is recommended but not required
- Output videos will be saved with "_dubbed" suffix