In [None]:
# ==============================================================================
# GPU SERVER - TRUE PARALLEL (2x SPEED) + ALL FEATURES
# ==============================================================================
import subprocess
import sys
import os
import time
import uuid
import threading
import queue
import shutil
import json
import random
import math
import urllib.request
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone 

# Optimize Memory
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Suppress Tokenizer Warnings
logging.getLogger("transformers").setLevel(logging.ERROR)

# ------------------------------------------
# CONFIGURATION & CREDENTIALS
# ------------------------------------------
SERVER_ID = str(uuid.uuid4())[:8]
R2_ACCOUNT_ID = "4fa19e788a951ba2d879c18782ef8bf0"
R2_ACCESS_KEY_ID = "d66adcff67ac5b4eca609a662b80e742"
R2_SECRET_ACCESS_KEY = "1a10aca3049c176d85cf3ec3c4e4ae8c6b715a4d9b1e67a79acc8b94d3b3c660"
R2_BUCKET_NAME = "video-generation-storage"
R2_ENDPOINT_URL = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"

UPSTASH_REDIS_REST_URL = "https://absolute-redfish-9172.upstash.io"
UPSTASH_REDIS_REST_TOKEN = "ASPUAAImcDE1MzBmZjIxMGNkYzY0YzBmYjFkZTNlZmE4NzY1ZjlhN3AxOTE3Mg"

# ------------------------------------------
# 1. DEPENDENCY CHECK
# ------------------------------------------
def install_if_missing(package, import_name=None):
    if import_name is None:
        import_name = package
    try:
        __import__(import_name)
    except ImportError:
        print(f"‚¨áÔ∏è Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])

if shutil.which('ffmpeg') is None:
    print("Installing FFmpeg...")
    subprocess.run(["apt-get", "update"], stdout=subprocess.DEVNULL)
    subprocess.run(["apt-get", "install", "-y", "ffmpeg"], stdout=subprocess.DEVNULL)

install_if_missing("boto3")
install_if_missing("requests")
install_if_missing("openai-whisper", "whisper")
install_if_missing("moviepy")
install_if_missing("unidecode")
install_if_missing("Pillow", "PIL")

try:
    import diffusers
    import transformers
    import accelerate
    import safetensors
except ImportError:
    print("‚¨áÔ∏è Installing AI Libraries...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "diffusers", "transformers", "accelerate", "safetensors", "xformers"])

# ------------------------------------------
# 2. IMPORTS
# ------------------------------------------
import torch
from diffusers import StableDiffusionXLPipeline, EulerDiscreteScheduler, AutoencoderTiny
import boto3
from botocore.config import Config
import requests as http_requests
import whisper
from PIL import Image, ImageFont, ImageDraw

# ------------------------------------------
# 3. FONT MANAGEMENT
# ------------------------------------------
FONT_DIR = "./fonts"
SUBTITLE_FONTS = {}

FONT_CONFIG = {
    'latin': {
        'file': 'NotoSans-Bold.ttf',
        'url': 'https://github.com/notofonts/notofonts.github.io/raw/main/fonts/NotoSans/hinted/ttf/NotoSans-Bold.ttf',
        'family_name': 'Noto Sans',
        'weight': 'Bold'
    },
    'devanagari': {
        'file': 'NotoSansDevanagari-Bold.ttf',
        'url': 'https://github.com/notofonts/notofonts.github.io/raw/main/fonts/NotoSansDevanagari/hinted/ttf/NotoSansDevanagari-Bold.ttf',
        'family_name': 'Noto Sans Devanagari',
        'weight': 'Bold'
    },
    'arabic': {
        'file': 'NotoSansArabic-Bold.ttf',
        'url': 'https://github.com/notofonts/notofonts.github.io/raw/main/fonts/NotoSansArabic/hinted/ttf/NotoSansArabic-Bold.ttf',
        'family_name': 'Noto Sans Arabic',
        'weight': 'Bold'
    },
    'cjk': {
        'file': 'NotoSansCJKsc-Bold.otf',
        'url': 'https://github.com/notofonts/noto-cjk/raw/main/Sans/OTF/SimplifiedChinese/NotoSansCJKsc-Bold.otf',
        'family_name': 'Noto Sans CJK SC',
        'weight': 'Bold'
    }
}

def setup_fonts():
    if not os.path.exists(FONT_DIR):
        os.makedirs(FONT_DIR)
    
    print("‚¨áÔ∏è Setting up fonts for subtitles...")
    for script, config in FONT_CONFIG.items():
        path = os.path.join(FONT_DIR, config['file'])
        if not os.path.exists(path):
            try:
                urllib.request.urlretrieve(config['url'], path)
            except Exception as e:
                print(f"‚ùå Failed to download {script} font: {e}")
                continue
        
        if os.path.exists(path):
            SUBTITLE_FONTS[script] = path
            print(f"  ‚úì Loaded {script}")

setup_fonts()

# ------------------------------------------
# 4. SUBTITLE ENGINE (CJK FIX INCLUDED)
# ------------------------------------------
def detect_script(text):
    if not text or not text.strip(): return 'latin'
    counts = {'devanagari': 0, 'arabic': 0, 'cjk': 0, 'latin': 0}
    for char in text:
        code = ord(char)
        if 0x0900 <= code <= 0x0DFF: counts['devanagari'] += 1
        elif 0x0600 <= code <= 0x077F: counts['arabic'] += 1
        elif 0x4E00 <= code <= 0x9FFF: counts['cjk'] += 1
        elif 0x3040 <= code <= 0x30FF: counts['cjk'] += 1
        else: counts['latin'] += 1
    max_script = max(counts.items(), key=lambda x: x[1])
    return max_script[0] if max_script[1] > 0 else 'latin'

def get_font_for_text(text):
    script = detect_script(text)
    mapping = {'devanagari': 'devanagari', 'arabic': 'arabic', 'cjk': 'cjk'}
    key = mapping.get(script, 'latin')
    if key in SUBTITLE_FONTS:
        return SUBTITLE_FONTS[key], FONT_CONFIG[key], script
    return SUBTITLE_FONTS.get('latin'), FONT_CONFIG.get('latin'), 'latin'

def time_to_ass(seconds):
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    cs = int((seconds % 1) * 100)
    return f"{h}:{m:02d}:{s:02d}.{cs:02d}"

def calculate_text_width(text, font_path, font_size):
    try:
        font = ImageFont.truetype(font_path, font_size)
        dummy = ImageDraw.Draw(Image.new("RGBA", (1, 1)))
        bbox = dummy.textbbox((0, 0), text, font=font)
        return bbox[2] - bbox[0]
    except:
        return len(text) * font_size * 0.6

def create_ass_event_for_page(page_lines, font_size, events, is_cjk, is_latin):
    if not page_lines: return
    start_time = page_lines[0][0]['start']
    end_time = page_lines[-1][-1]['end']
    
    def process_word(word_obj):
        raw = word_obj.get('word', '').strip()
        return raw.upper() if is_latin else raw
    
    separator = '' if is_cjk else ' '
    full_text_lines = [separator.join([process_word(w) for w in line]) for line in page_lines]
    full_text = '\\N'.join(full_text_lines)
    
    events.append(f"Dialogue: 0,{time_to_ass(start_time)},{time_to_ass(end_time)},BG,,0,0,0,,{{\\bord15\\shad0\\1c&H000000&\\1a&H4D&}}{full_text}")
    
    for line_idx, line in enumerate(page_lines):
        for word_idx, word_obj in enumerate(line):
            w_start = word_obj.get('start', start_time)
            w_end = word_obj.get('end', end_time)
            
            highlight_lines = []
            for l_idx, l in enumerate(page_lines):
                line_words = []
                for w_idx, w in enumerate(l):
                    disp = process_word(w)
                    if l_idx == line_idx and w_idx == word_idx:
                        line_words.append(f"{{\\bord8\\3c&HD30093&\\3a&H4D&\\shad2\\4c&H000000&\\4a&H00&}}{disp}{{\\bord0\\shad0}}")
                    else:
                        line_words.append(disp)
                highlight_lines.append(separator.join(line_words))
            
            hl_text = '\\N'.join(highlight_lines)
            events.append(f"Dialogue: 1,{time_to_ass(w_start)},{time_to_ass(w_end)},PurpleBG,,0,0,0,,{hl_text}")
    
    events.append(f"Dialogue: 2,{time_to_ass(start_time)},{time_to_ass(end_time)},Default,,0,0,0,,{full_text}")
    
    for line_idx, line in enumerate(page_lines):
        for word_idx, word_obj in enumerate(line):
            w_start = word_obj.get('start', start_time)
            w_end = word_obj.get('end', end_time)
            top_lines = []
            for l_idx, l in enumerate(page_lines):
                line_words = []
                for w_idx, w in enumerate(l):
                    disp = process_word(w)
                    if l_idx == line_idx and w_idx == word_idx:
                        line_words.append(f"{{\\c&HFFFFFF&}}{disp}")
                    else:
                        line_words.append(disp)
                top_lines.append(separator.join(line_words))
            
            joined_top_lines = '\\N'.join(top_lines)
            events.append(f"Dialogue: 3,{time_to_ass(w_start)},{time_to_ass(w_end)},Default,,0,0,0,,{joined_top_lines}")

def generate_clip_ass_subtitles(subtitle_words, width, height, output_file):
    if not subtitle_words: return False
    
    all_text_raw = "".join([w.get('word', '') for w in subtitle_words])
    font_path, font_config, script = get_font_for_text(all_text_raw)
    font_family = font_config['family_name']
    
    is_cjk = script == 'cjk'

    processed_words = []
    if is_cjk:
        for w in subtitle_words:
            text = w.get('word', '').strip()
            start = w.get('start', 0)
            end = w.get('end', 0)
            duration = end - start
            length = len(text)
            
            if length > 1:
                char_duration = duration / length
                for i, char in enumerate(text):
                    char_start = start + (i * char_duration)
                    char_end = start + ((i + 1) * char_duration)
                    processed_words.append({'word': char, 'start': char_start, 'end': char_end})
            elif length == 1:
                processed_words.append(w)
    else:
        processed_words = subtitle_words

    aspect = width / height
    if aspect < 0.8: # 9:16
        ref_h, ref_f, ref_pos = 1280, 63, 0.75
        pos_offset = 0
    elif aspect > 1.5: # 16:9
        ref_h, ref_f, ref_pos = 720, 72, 0.85
        pos_offset = 38
    else: # 1:1 or 4:5
        ref_h, ref_f, ref_pos = 1080, 68, 0.80
        pos_offset = 0
        
    font_size = int(ref_f * (height / ref_h))
    ref_position = int(height * ref_pos) + pos_offset
    margin_v = height - ref_position
    
    header = f"""[Script Info]
ScriptType: v4.00+
PlayResX: {width}
PlayResY: {height}
WrapStyle: 0
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font_family},{font_size},&H00FFFFFF,&H000000FF,&H00000000,&HB3000000,-1,0,0,0,100,100,0,0,1,0,0,2,20,20,{margin_v},1
Style: BG,{font_family},{font_size},&H00FFFFFF,&H000000FF,&H00000000,&HB3000000,-1,0,0,0,100,100,0,0,1,0,0,2,20,20,{margin_v},1
Style: PurpleBG,{font_family},{font_size},&H00FFFFFF,&H000000FF,&H00000000,&HB4D30093,-1,0,0,0,100,100,0,0,1,3,2,2,20,20,{margin_v},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
    events = []
    
    is_latin = script == 'latin'
    max_line_width = width * 0.90
    space_width = 0 if is_cjk else int(font_size * 0.25)
    
    current_page_lines = []
    current_line = []
    current_line_width = 0
    
    for word_obj in processed_words:
        raw = word_obj.get('word', '').strip()
        disp = raw.upper() if is_latin else raw
        w_width = calculate_text_width(disp, font_path, font_size) + 10
        
        if current_line_width + w_width + space_width > max_line_width:
            if current_line: current_page_lines.append(current_line)
            if len(current_page_lines) >= 2:
                create_ass_event_for_page(current_page_lines, font_size, events, is_cjk, is_latin)
                current_page_lines = []
            current_line = [word_obj]
            current_line_width = w_width + space_width
        else:
            current_line.append(word_obj)
            current_line_width += w_width + space_width
            
    if current_line: current_page_lines.append(current_line)
    if current_page_lines: create_ass_event_for_page(current_page_lines, font_size, events, is_cjk, is_latin)
    
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(header + "\n".join(events))
    return True

# ------------------------------------------
# 5. R2 CLIENT
# ------------------------------------------
print("üîß Setting up R2...")
s3_client = boto3.client(
    's3',
    endpoint_url=R2_ENDPOINT_URL,
    aws_access_key_id=R2_ACCESS_KEY_ID,
    aws_secret_access_key=R2_SECRET_ACCESS_KEY,
    config=Config(signature_version='s3v4'),
    region_name='auto'
)
print("‚úÖ R2 ready")

# ------------------------------------------
# 6. REDIS CLIENT & HEARTBEAT
# ------------------------------------------
print("üîß Setting up Redis...")

class UpstashRedis:
    def __init__(self, url, token):
        self.url = url.rstrip('/')
        self.token = token
        self.headers = {"Authorization": f"Bearer {token}"}
    
    def set(self, key, value, ex=None):
        try:
            cmd = ["SET", key, value]
            if ex:
                cmd.extend(["EX", str(ex)])
            response = http_requests.post(f"{self.url}/pipeline", headers=self.headers, json=[cmd], timeout=5)
            return response.status_code == 200
        except:
            return False
    
    def get(self, key):
        try:
            response = http_requests.post(f"{self.url}/pipeline", headers=self.headers, json=[["GET", key]], timeout=5)
            if response.status_code == 200:
                data = response.json()
                return data[0].get("result") if data else None
            return None
        except:
            return None
    
    def delete(self, key):
        try:
            response = http_requests.post(f"{self.url}/pipeline", headers=self.headers, json=[["DEL", key]], timeout=5)
            return response.status_code == 200
        except:
            return False
    
    def keys(self, pattern):
        try:
            response = http_requests.post(f"{self.url}/pipeline", headers=self.headers, json=[["KEYS", pattern]], timeout=5)
            if response.status_code == 200:
                data = response.json()
                return data[0].get("result", []) if data else []
            return []
        except:
            return []

redis_client = UpstashRedis(UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN)
print("‚úÖ Redis ready")

def heartbeat_worker():
    print("üíì Heartbeat monitor started...")
    while True:
        try:
            check = redis_client.get("health_check_request")
            if check:
                response_key = f"health_response:{SERVER_ID}"
                redis_client.set(response_key, "I am there", ex=10)
                time.sleep(2)
            time.sleep(0.5)
        except Exception as e:
            print(f"üíì Heartbeat error: {e}")
            time.sleep(5)

threading.Thread(target=heartbeat_worker, daemon=True).start()

# ------------------------------------------
# 7. WHISPER MODEL
# ------------------------------------------
print("‚è≥ Loading Whisper Model on GPU...")
WHISPER_MODEL = whisper.load_model("base", device="cuda")
print("‚úÖ Whisper model loaded on GPU")

# ------------------------------------------
# 8. TRANSCRIPTION FUNCTIONS
# ------------------------------------------
def format_time_ref(seconds):
    m = int(seconds // 60)
    s = int(seconds % 60)
    return f"{m:02d}:{s:02d}"

def create_prompts_txt(raw_result, duration):
    try:
        import math
        from unidecode import unidecode
        
        num_clips = math.ceil(duration / 6.0)
        bins = [""] * num_clips
        all_words = []
        
        INDIC_LANGS = {'hi', 'bn', 'gu', 'kn', 'ml', 'mr', 'pa', 'ta', 'te', 'ur', 'sd', 'ne'}
        detected_lang = raw_result.get('language', 'en')
        should_transliterate = detected_lang in INDIC_LANGS
        
        for segment in raw_result['segments']:
            if 'words' in segment:
                for w in segment['words']:
                    w_text = unidecode(w['word']) if should_transliterate else w['word']
                    all_words.append({'word': w_text, 'start': w['start']})
        
        if all_words:
            for w in all_words:
                start = w['start']
                idx = int(start // 6)
                if idx < num_clips:
                    bins[idx] += w['word'].strip() + " "
        else:
            for segment in raw_result['segments']:
                start = segment['start']
                idx = int(start // 6)
                txt = segment['text'].strip()
                if should_transliterate:
                    txt = unidecode(txt)
                if idx < num_clips:
                    bins[idx] += txt + " "
        
        lines = []
        for i in range(num_clips):
            start_str = format_time_ref(i * 6)
            end_str = format_time_ref((i + 1) * 6)
            text_content = bins[i].strip() if bins[i].strip() else "[No speech]"
            lines.append(f"[{i+1}] ({start_str} - {end_str})")
            lines.append(f"Text: {text_content}")
            lines.append("")
            lines.append("-" * 20)
            lines.append("")
        
        return "\n".join(lines)
        
    except Exception as e:
        print(f"‚ùå Error generating prompts: {e}")
        return None

def create_subtitles_ass(raw_result):
    try:
        from unidecode import unidecode
        INDIC_LANGS = {'hi', 'bn', 'gu', 'kn', 'ml', 'mr', 'pa', 'ta', 'te', 'ur', 'sd', 'ne'}
        detected_lang = raw_result.get('language', 'en')
        should_transliterate = detected_lang in INDIC_LANGS
        
        ass_lines = [
            "[Script Info]",
            "Title: Generated Subtitles",
            "ScriptType: v4.00+",
            "WrapStyle: 0",
            "ScaledBorderAndShadow: yes",
            "YCbCr Matrix: None",
            "",
            "[V4+ Styles]",
            "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding",
            "Style: Default,Arial,20,&H00FFFFFF,&H000000FF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,2,2,10,10,10,1",
            "",
            "[Events]",
            "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text",
            ""
        ]
        
        def format_ass_time(seconds):
            h = int(seconds // 3600)
            m = int((seconds % 3600) // 60)
            s = int(seconds % 60)
            cs = int((seconds % 1) * 100)
            return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
        
        for segment in raw_result['segments']:
            start_time = format_ass_time(segment['start'])
            end_time = format_ass_time(segment['end'])
            text = segment['text'].strip()
            
            if should_transliterate:
                text = unidecode(text)
            
            text = text.replace('\\', '\\\\').replace('\n', '\\N')
            ass_lines.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{text}")
        
        return "\n".join(ass_lines)
        
    except Exception as e:
        print(f"‚ùå Error generating subtitles: {e}")
        return None

def transcribe_audio_on_gpu(audio_path):
    try:
        print(f"üéôÔ∏è Transcribing with Whisper (GPU)...")
        result = WHISPER_MODEL.transcribe(audio_path, language=None, word_timestamps=True, verbose=False)
        print(f"‚úÖ Transcription complete: {len(result['segments'])} segments")
        return result
    except Exception as e:
        print(f"‚ùå Transcription error: {e}")
        return None

def process_transcription_job(job_key, job_data):
    try:
        job = json.loads(job_data)
        job_id = job['job_id']
        audio_r2_key = job['audio_r2_key']
        
        print(f"\nüéôÔ∏è [TRANSCRIPTION] Processing: {job_id}")
        local_audio = f"/tmp/audio_{job_id}.mp3"
        print(f"   ‚¨áÔ∏è Downloading audio from R2...")
        s3_client.download_file(R2_BUCKET_NAME, audio_r2_key, local_audio)
        
        from moviepy.editor import AudioFileClip
        clip = AudioFileClip(local_audio)
        duration = clip.duration
        clip.close()
        
        print(f"   üéôÔ∏è Transcribing on GPU...")
        start_time = time.time()
        result = transcribe_audio_on_gpu(local_audio)
        
        if not result:
            redis_client.set(f"transcription_failed:{job_id}", "Transcription failed", ex=3600)
            redis_client.delete(job_key)
            os.remove(local_audio)
            return False
        
        transcribe_time = time.time() - start_time
        
        print(f"   üìù Generating prompts TXT...")
        prompts_txt = create_prompts_txt(result, duration)
        
        print(f"   üìÑ Generating subtitles ASS...")
        subtitles_ass = create_subtitles_ass(result)
        
        result_json = json.dumps(result)
        result_r2_key = f"transcriptions/{job_id}.json"
        
        print(f"   ‚¨ÜÔ∏è Uploading transcription to R2...")
        s3_client.put_object(Bucket=R2_BUCKET_NAME, Key=result_r2_key, Body=result_json.encode('utf-8'), ContentType='application/json')
        
        prompts_r2_key = f"transcriptions/{job_id}_prompts.txt"
        print(f"   ‚¨ÜÔ∏è Uploading prompts TXT to R2...")
        s3_client.put_object(Bucket=R2_BUCKET_NAME, Key=prompts_r2_key, Body=prompts_txt.encode('utf-8'), ContentType='text/plain')
        
        subtitles_r2_key = f"transcriptions/{job_id}_subtitles.ass"
        print(f"   ‚¨ÜÔ∏è Uploading subtitles ASS to R2...")
        s3_client.put_object(Bucket=R2_BUCKET_NAME, Key=subtitles_r2_key, Body=subtitles_ass.encode('utf-8'), ContentType='text/plain')
        
        completion_data = json.dumps({
            "transcription_r2_key": result_r2_key,
            "prompts_r2_key": prompts_r2_key,
            "subtitles_r2_key": subtitles_r2_key,
            "duration": transcribe_time,
            "audio_duration": duration,
            "segments": len(result['segments']),
            "job_id": job_id
        })
        
        redis_client.set(f"transcription_complete:{job_id}", completion_data, ex=3600)
        redis_client.delete(job_key)
        os.remove(local_audio)
        print(f"‚úÖ [TRANSCRIPTION] Done! ‚è±Ô∏è {transcribe_time:.1f}s")
        return True
        
    except Exception as e:
        print(f"‚ùå [TRANSCRIPTION] Failed: {e}")
        redis_client.set(f"transcription_failed:{job_id}", str(e), ex=3600)
        redis_client.delete(job_key)
        return False

# ------------------------------------------
# 9. VIDEO GENERATION (UPDATED: PARALLEL DISPATCH)
# ------------------------------------------
STEPS = 4
GUIDANCE_SCALE = 2.0

GPU_COUNT = torch.cuda.device_count()
gpu_queues = [queue.Queue() for _ in range(GPU_COUNT)]
video_process_queue = queue.Queue()
results = {} # Still used for tracking within R2 upload context if needed, but primary state is Redis
counter_lock = threading.Lock()

def upload_to_r2(video_bytes, job_id, max_retries=3):
    for attempt in range(max_retries):
        try:
            video_key = f"videos/single/{job_id}.mp4"
            s3_client.put_object(Bucket=R2_BUCKET_NAME, Key=video_key, Body=video_bytes, ContentType='video/mp4')
            url = s3_client.generate_presigned_url('get_object', Params={'Bucket': R2_BUCKET_NAME, 'Key': video_key}, ExpiresIn=604800)
            return url
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
    return None

def image_to_video_clip(image, target_width, target_height, subtitle_path=None):
    unique_id = str(uuid.uuid4())
    img_path = f"temp_{unique_id}.png"
    vid_path = f"output_{unique_id}.mp4"

    try:
        image.save(img_path)
        w = (int(target_width) // 2) * 2
        h = (int(target_height) // 2) * 2
        
        zoom_curve = "1.1+0.1*cos(2*PI*on/180)"
        filter_chain = (
            f"scale={w}:{h}:flags=lanczos,"
            f"zoompan=z='{zoom_curve}':d=180:x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':s={w}x{h},"
            f"vignette=PI/6"
        )
        
        if subtitle_path and os.path.exists(subtitle_path):
            abs_sub = os.path.abspath(subtitle_path).replace('\\', '/').replace(':', '\\:')
            abs_fonts = os.path.abspath(FONT_DIR).replace('\\', '/').replace(':', '\\:')
            filter_chain += f",subtitles='{abs_sub}':fontsdir='{abs_fonts}'"
        
        filter_chain += ",fps=30"
        
        cmd = [
            "ffmpeg", "-y", "-loop", "1", "-i", img_path, "-t", "6",
            "-vf", filter_chain, "-c:v", "libx264", "-preset", "ultrafast",
            "-crf", "23", "-maxrate", "10M", "-bufsize", "20M",
            "-pix_fmt", "yuv420p", "-threads", "8", vid_path
        ]

        subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
        
        if os.path.exists(vid_path):
            with open(vid_path, "rb") as f:
                return f.read()
        return None
    except Exception as e:
        print(f"Error in image_to_video_clip: {e}")
        return None
    finally:
        if os.path.exists(img_path): os.remove(img_path)
        if os.path.exists(vid_path): os.remove(vid_path)

def load_engine(device_id):
    print(f"‚è≥ Loading SDXL on GPU {device_id}...")
    pipe = StableDiffusionXLPipeline.from_pretrained(
        "SG161222/RealVisXL_V4.0_Lightning",
        torch_dtype=torch.float16,
        variant="fp16"
    ).to(f"cuda:{device_id}")
    
    pipe.unet.to(memory_format=torch.channels_last)
    try:
        pipe.enable_xformers_memory_efficient_attention()
    except: pass
    
    pipe.vae = AutoencoderTiny.from_pretrained("madebyollin/taesdxl", torch_dtype=torch.float16).to(f"cuda:{device_id}")

    pipe.scheduler = EulerDiscreteScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
    pipe.set_progress_bar_config(disable=True)
    return pipe

pipelines = []
if GPU_COUNT > 0:
    for i in range(GPU_COUNT):
        pipelines.append(load_engine(i))

def get_safe_resolution(aspect_ratio_name):
    # EXTREME SPEED SETTINGS (6s)
    if aspect_ratio_name == "16:9": return (1152, 640, 1920, 1080)
    elif aspect_ratio_name == "9:16": return (640, 1152, 1080, 1920)
    elif aspect_ratio_name == "4:5": return (896, 1088, 1080, 1350)
    elif aspect_ratio_name == "16:7": return (1152, 512, 1920, 840)
    else: return (1024, 1024, 1024, 1024)

def gpu_worker(gpu_id, pipeline, task_queue):
    while True:
        try:
            task = task_queue.get(timeout=0.05)
            neg = "bad anatomy, bad hands, missing fingers, extra fingers, three hands, three legs, bad arms, missing legs, missing arms, poorly drawn face, bad face, fused face, cloned face, three crus, fused feet, fused thigh, extra crus, ugly, gross, sloppy, messy, blurry, low quality, duplicate, distortion, mutation, double head"
            image = pipeline(task['prompt'], num_inference_steps=STEPS, guidance_scale=GUIDANCE_SCALE, width=task['ai_width'], height=task['ai_height'], negative_prompt=neg).images[0]
            
            # Pass everything to post-processing queue
            video_process_queue.put({'image': image, 'task': task})
            task_queue.task_done()
        except queue.Empty:
            continue

def cpu_video_worker():
    while True:
        try:
            item = video_process_queue.get(timeout=0.05)
            image = item['image']
            task = item['task']
            job_id = task['job_id']
            start_time = task.get('start_time', time.time())
            
            # --- SUBTITLES ---
            subtitle_path = None
            if 'subtitle_data' in task and task['subtitle_data']:
                try:
                    unique_sub_id = str(uuid.uuid4())
                    temp_ass_path = f"temp_{unique_sub_id}.ass"
                    success = generate_clip_ass_subtitles(task['subtitle_data'], task['final_width'], task['final_height'], temp_ass_path)
                    if success:
                        subtitle_path = temp_ass_path
                except Exception as e:
                    print(f"Subtitle gen failed: {e}")
            
            # --- RENDER VIDEO ---
            video_bytes = image_to_video_clip(image, target_width=task['final_width'], target_height=task['final_height'], subtitle_path=subtitle_path)
            
            if subtitle_path and os.path.exists(subtitle_path):
                os.remove(subtitle_path)
            
            if video_bytes:
                size_mb = len(video_bytes) / (1024 * 1024)
                r2_url = upload_to_r2(video_bytes, job_id)
                
                gen_time = time.time() - start_time
                
                # --- UPDATE REDIS HERE (ASYNC FROM MAIN THREAD) ---
                if r2_url:
                    result_data = json.dumps({"r2_url": r2_url, "time": str(gen_time), "size_mb": str(size_mb), "job_id": job_id})
                    redis_client.set(f"completed:{job_id}", result_data, ex=3600)
                    print(f"‚úÖ [VIDEO] Done! ‚è±Ô∏è {gen_time:.1f}s | üíæ {size_mb:.1f}MB")
                else:
                    redis_client.set(f"failed:{job_id}", "Upload failed", ex=3600)
            
            video_process_queue.task_done()
        except queue.Empty:
            continue
        except Exception as e:
            print(f"CPU Worker Error: {e}")

for i, pipe in enumerate(pipelines):
    threading.Thread(target=gpu_worker, args=(i, pipe, gpu_queues[i]), daemon=True).start()

for _ in range(8):
    threading.Thread(target=cpu_video_worker, daemon=True).start()

def dispatch_video_job(job_key, job_data, gpu_id):
    """
    NON-BLOCKING dispatch function.
    Prepares the job and pushes it to the GPU queue.
    Returns True if dispatched, False if error.
    """
    try:
        job = json.loads(job_data)
        job_id = job['job_id']
        prompt = job['prompt']
        ratio = job['ratio']
        subtitle_data = job.get('subtitle_data', [])
        
        ai_w, ai_h, final_w, final_h = get_safe_resolution(ratio)
        
        # Smart Truncate
        safe_prompt = prompt[:180] 
        full_prompt = f"{safe_prompt}, 8k, masterpiece, highly detailed, cinematic lighting"
        
        task = {
            'job_id': job_id, 
            'prompt': full_prompt, 
            'ai_width': ai_w, 
            'ai_height': ai_h, 
            'final_width': final_w, 
            'final_height': final_h, 
            'subtitle_data': subtitle_data,
            'start_time': time.time() # Track start time here
        }
        
        # Fire and forget!
        gpu_queues[gpu_id].put(task)
        return True
        
    except Exception as e:
        print(f"‚ùå Dispatch Failed: {e}")
        redis_client.set(f"failed:{job.get('job_id', 'unknown')}", str(e), ex=3600)
        return False

# ------------------------------------------
# MAIN LOOP
# ------------------------------------------
def main():
    print("\n" + "="*60)
    print(f"üöÄ GPU SERVER {SERVER_ID} - READY (PARALLEL MODE)")
    print("="*60)
    print(f"‚úÖ GPUs: {GPU_COUNT}")
    print(f"‚úÖ R2: {R2_BUCKET_NAME}")
    print(f"‚úÖ Redis: {UPSTASH_REDIS_REST_URL.split('//')[1].split('.')[0]}")
    print(f"‚úÖ Whisper: Loaded on GPU")
    print(f"‚úÖ Fonts: Loaded for Multilingual Support")
    print(f"üî• TRANSCRIPTION HAS HIGHEST PRIORITY")
    print("="*60 + "\n")
    
    processed = 0
    gpu_rr = 0
    
    while True:
        # ‚úÖ DUAL AUTO-SHUTDOWN CHECK (3:30 PM & 11:00 PM IST)
        now_utc = datetime.now(timezone.utc)
        now_ist = now_utc + timedelta(hours=5, minutes=30)
        
        is_time1 = (now_ist.hour == 15 and now_ist.minute == 30)
        is_time2 = (now_ist.hour == 23 and now_ist.minute == 0)
        
        if is_time1 or is_time2:
            print(f"\nüõë Scheduled Shutdown: It is {now_ist.strftime('%I:%M %p')} IST. Terminating.")
            sys.exit(0)

        try:
            # 1. TRANSCRIPTION (Priority) - Still sequential usually best for stability
            transcribe_keys = redis_client.keys("transcribe_pending:*")
            if transcribe_keys:
                print(f"\nüî• PRIORITY: Found {len(transcribe_keys)} transcription job(s)")
                for job_key in transcribe_keys:
                    job_data = redis_client.get(job_key)
                    if job_data:
                        # We delete immediately to avoid re-reading, 
                        # logic inside process_transcription_job handles success/fail updates
                        # But process_transcription_job currently deletes at end. 
                        # Since transcription is fast/CPU/GPU hybrid and sequential here, we leave it blocking for safety.
                        process_transcription_job(job_key, job_data)
                        processed += 1
                continue
            
            # 2. VIDEO GENERATION (True Parallel)
            pending_keys = redis_client.keys("pending:*")
            if pending_keys:
                for job_key in pending_keys:
                    job_data = redis_client.get(job_key)
                    if job_data:
                        # Pick GPU
                        gpu_id = gpu_rr % len(pipelines)
                        gpu_rr += 1
                        
                        # Dispatch
                        if dispatch_video_job(job_key, job_data, gpu_id):
                            # IMPORTANT: Delete from 'pending' immediately so main loop 
                            # doesn't pick it up again while GPU is working.
                            redis_client.delete(job_key)
                            processed += 1
            
            time.sleep(0.1) # Faster loop for responsiveness
            
        except KeyboardInterrupt:
            print("\nüõë Shutting down...")
            break
        except Exception as e:
            print(f"‚ùå Error: {e}")
            time.sleep(5)

if __name__ == "__main__":
    main()