In [None]:
from google.colab import drive
drive.mount('/content/drive')

!pip install -q yt-dlp edge-tts moviepy git+https://github.com/openai/whisper.git requests

import os, re, whisper, requests, hashlib, pickle
from moviepy.editor import VideoFileClip, CompositeVideoClip, AudioFileClip, ColorClip
from google.colab import userdata
import gc  # For memory management


In [None]:
def descargar_fondo(keyword="abstract", output_dir="/content/drive/MyDrive/drama-automation/fondos"):
    """Download background video with caching and error handling."""
    os.makedirs(output_dir, exist_ok=True)
    safe_keyword = keyword.replace(" ", "_")
    ruta_fondo = f"{output_dir}/fondo_{safe_keyword}.mp4"

    # Check cache first
    if os.path.exists(ruta_fondo):
        print(f"✅ Fondo '{keyword}' ya existe (usando caché).")
        return ruta_fondo

    url = f"https://api.pexels.com/videos/search?query={keyword}&per_page=1&orientation=vertical"
    headers = {"Authorization": userdata.get('PEXELS_API_KEY')}
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
    except requests.RequestException as e:
        print(f"❌ Error al conectar con Pexels: {e}")
        return None

    if response.status_code == 200:
        data = response.json()
        if data.get("videos"):
            video = data["videos"][0]
            # Buscar archivo en SD o similar (optimized)
            for archivo in video["video_files"]:
                if archivo["quality"] == "sd" or "360" in str(archivo.get("height", "")):
                    !wget -q -O "{ruta_fondo}" "{archivo['link']}"
                    print(f"✅ Fondo '{keyword}' descargado.")
                    return ruta_fondo
            # Fallback: primer archivo disponible
            !wget -q -O "{ruta_fondo}" "{video['video_files'][0]['link']}"
            print(f"✅ Fondo '{keyword}' descargado (calidad alternativa).")
            return ruta_fondo
    print(f"❌ No se encontró fondo para: {keyword}")
    return None


In [None]:
BASE_DIR = "/content/drive/MyDrive/drama-automation"
URLS_FILE = f"{BASE_DIR}/data/urls.txt"
TEMP_DIR = f"{BASE_DIR}/temp"
CACHE_DIR = f"{BASE_DIR}/cache"  # NEW: Cache directory
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)

# Leer enlaces y keywords
with open(URLS_FILE, "r") as f:
    lines = [line.strip() for line in f if line.strip() and not line.startswith("#")]

dramas = []
for line in lines:
    if "|" in line:
        url, keyword = line.split("|", 1)
        dramas.append((url.strip(), keyword.strip()))
    else:
        dramas.append((line.strip(), "abstract"))

print(f"🎬 Se procesarán {len(dramas)} dramas.")

# Descargar dramas
for i, (url, keyword) in enumerate(dramas):
    output_path = f"{TEMP_DIR}/drama_{i+1}.mp4"
    if not os.path.exists(output_path):  # Skip if already downloaded
        print(f"📥 Descargando drama {i+1}...")
        !yt-dlp -f "best[height<=480]" -o "{output_path}" "{url}"
    else:
        print(f"✅ Drama {i+1} ya descargado (usando caché).")
print("✅ Todos los dramas descargados.")


In [None]:
# Configuración de Shorts
SHORTS_TIMINGS = [
    ("00:01:10", "00:01:40"), ("00:03:20", "00:03:50"), ("00:05:45", "00:06:15"),
    ("00:08:10", "00:08:40"), ("00:11:30", "00:12:00"), ("00:14:20", "00:14:50"),
    ("00:17:10", "00:17:40"), ("00:20:05", "00:20:35"), ("00:23:40", "00:24:10"),
    ("00:27:15", "00:27:45")
]

# Capítulos para YouTube
CAPITULOS = [
    ("00:00", "Introducción"), ("01:10", "El regreso inesperado"), ("03:20", "Secretos del pasado"),
    ("05:45", "Primer enfrentamiento"), ("08:10", "La traición"), ("11:30", "Lágrimas de verdad"),
    ("14:20", "Decisión final"), ("17:10", "Consecuencias"), ("20:05", "Nuevas alianzas"),
    ("23:40", "El juramento"), ("27:15", "Final del capítulo 10")
]

def get_cache_key(file_path):
    """Generate cache key based on file path and modification time."""
    stat = os.stat(file_path)
    return hashlib.md5(f"{file_path}_{stat.st_mtime}_{stat.st_size}".encode()).hexdigest()

def get_cached_transcription(audio_path, cache_dir):
    """Load cached transcription if available."""
    cache_key = get_cache_key(audio_path)
    cache_file = f"{cache_dir}/transcription_{cache_key}.pkl"
    if os.path.exists(cache_file):
        with open(cache_file, 'rb') as f:
            return pickle.load(f)
    return None

def save_transcription_cache(audio_path, result, cache_dir):
    """Save transcription to cache."""
    cache_key = get_cache_key(audio_path)
    cache_file = f"{cache_dir}/transcription_{cache_key}.pkl"
    with open(cache_file, 'wb') as f:
        pickle.dump(result, f)

def optimize_guion(text):
    """Optimized text processing with minimal regex calls."""
    # Single pass with combined regex patterns
    text = re.sub(r'\.\s*', '. <break time="600ms"/> ', text)
    text = re.sub(r',\s*', ', <break time="300ms"/> ', text)
    return " ".join(text.split())  # Normalize whitespace

# Cargar Whisper model ONCE (outside loop)
print("📚 Cargando modelo Whisper...")
model = whisper.load_model("small")

# Procesar cada drama
drama_files = sorted([f for f in os.listdir(TEMP_DIR) if f.endswith(".mp4")])
for drama_file in drama_files:
    idx = int(drama_file.replace("drama_", "").replace(".mp4", ""))
    url, keyword = dramas[idx - 1]

    drama_path = f"{TEMP_DIR}/{drama_file}"
    output_dir = f"{BASE_DIR}/output/drama_{idx}"
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(f"{output_dir}/shorts", exist_ok=True)

    print(f"\n🎥 Procesando drama {idx} (fondo: {keyword})...")

    # --- 1. Transcripción con caché ---
    audio_path = f"{TEMP_DIR}/audio_{idx}.wav"
    
    # Check cache first
    result = get_cached_transcription(drama_path, CACHE_DIR)
    
    if result is None:
        print("  🔄 Transcribiendo audio (no hay caché)...")
        clip = VideoFileClip(drama_path)
        clip.audio.write_audiofile(audio_path, logger=None)
        clip.close()  # OPTIMIZATION: Release immediately
        del clip
        gc.collect()
        
        result = model.transcribe(audio_path, language="es")
        save_transcription_cache(drama_path, result, CACHE_DIR)
    else:
        print("  ✅ Usando transcripción en caché")

    # --- 2. TTS mejorado (optimized text processing) ---
    guion = optimize_guion(result["text"])
    tts_path = f"{output_dir}/tts.mp3"
    
    if not os.path.exists(tts_path):  # Skip if already exists
        !edge-tts --text "{guion.replace('"', '\\"')}" --voice es-ES-ElviraNeural --write-media "{tts_path}"
    else:
        print("  ✅ Audio TTS ya existe (usando caché)")

    # --- 3. Subtítulos ---
    srt_path = f"{output_dir}/subtitulos.srt"
    if not os.path.exists(srt_path):  # Skip if already exists
        writer = whisper.utils.get_writer("srt", output_dir)
        writer(result, "subtitulos.srt")

    # --- 4. Fondo automático (con caché) ---
    fondo_path = descargar_fondo(keyword=keyword)

    # --- 5. Video largo (16:9) ---
    largo_path = f"{output_dir}/largo.mp4"
    if not os.path.exists(largo_path):  # Skip if already exists
        print("  🎬 Creando video largo...")
        clip = VideoFileClip(drama_path)
        
        if fondo_path:
            fondo_clip = VideoFileClip(fondo_path).loop(duration=clip.duration)
            fondo_clip = fondo_clip.resize(clip.size)
        else:
            fondo_clip = ColorClip(size=clip.size, color=(30,30,30), duration=clip.duration)

        video_largo = CompositeVideoClip([fondo_clip.set_opacity(0.3), clip.without_audio()])
        video_largo = video_largo.set_audio(AudioFileClip(tts_path))
        video_largo.write_videofile(largo_path, codec="libx264", audio_codec="aac", logger=None)
        
        # OPTIMIZATION: Clean up resources
        video_largo.close()
        fondo_clip.close()
        clip.close()
        del video_largo, fondo_clip, clip
        gc.collect()
    else:
        print("  ✅ Video largo ya existe (usando caché)")

    # --- 6. Shorts (9:16) - Optimized loading ---
    print("  ✂️ Creando shorts...")
    # Load clip ONCE for all shorts
    clip = VideoFileClip(drama_path)
    
    for j, (start, end) in enumerate(SHORTS_TIMINGS):
        short_path = f"{output_dir}/shorts/short_{j+1}.mp4"
        if os.path.exists(short_path):  # Skip if exists
            continue
            
        try:
            # OPTIMIZATION: Extract subclip without reloading
            sub = clip.subclip(start, end)
            sub = sub.resize(height=1920).crop(x_center=sub.w/2, width=1080)
            sub.write_videofile(short_path, fps=24, audio=False, logger=None)
            sub.close()  # Release immediately
            del sub
        except Exception as e:
            print(f"  ⚠️ Short {j+1} omitido: {str(e)[:50]}")
    
    # OPTIMIZATION: Close clip after all shorts
    clip.close()
    del clip
    gc.collect()

    # --- 7. Capítulos ---
    with open(f"{output_dir}/capitulos.txt", "w") as f:
        for t, title in CAPITULOS:
            f.write(f"{t} - {title}\n")
    
    # OPTIMIZATION: Clean up temporary audio file
    if os.path.exists(audio_path):
        os.remove(audio_path)

print("\n🎉 ¡Procesamiento completado! Revisa la carpeta /output")


In [None]:
import zipfile
from pathlib import Path

zip_path = f"{BASE_DIR}/output/dramas_completos.zip"
output_path = Path(f"{BASE_DIR}/output")

print("📦 Empaquetando resultados...")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:  # Use compression
    # OPTIMIZATION: Use Path.rglob for efficient traversal
    for file_path in output_path.rglob("*"):
        if file_path.is_file() and file_path.name != "dramas_completos.zip":
            arc_path = file_path.relative_to(output_path)
            zf.write(file_path, arc_path)
            
print(f"✅ Resultados empaquetados: {zip_path}")
print(f"📊 Tamaño del archivo: {os.path.getsize(zip_path) / (1024*1024):.2f} MB")
