# 🎬 VideoRobot — Full Automatic Pipeline (Colab Runner)
Clone → Install → Start Backend → Healthcheck → Smart Manifest → Render → Download → Copy to Drive

**Highlights**
- اجرا کاملاً داخل Colab
- دارایی‌ها از Google Drive یا `/content/Assets`
- پچ حداقلی ایمپورت‌ها (فقط در صورت نیاز) + لاگ شفاف
- دکمهٔ دانلود MP4 و کپی خودکار به Drive

سازگار با: `https://github.com/englishpodcasteasy-glitch/videorobot`
---

In [None]:
# -*- coding: utf-8 -*-"""VideoRobot Full Pipeline — Colab Runner"""import os, pathlib, shutil, subprocess, sys, time, socket, signal, tempfile, json, requests, refrom IPython.display import HTML, display# === Constants ===REPO_URL = "https://github.com/englishpodcasteasy-glitch/videorobot.git"CLONE_DIR = pathlib.Path("/content/videorobot")BACKEND_DIR = CLONE_DIR / "backend"MANIFEST_PATH = pathlib.Path("/content/manifest.json")LOG_PATH = pathlib.Path("/content/backend.log")DEFAULT_PORTS = [8000, 8001, 8002, 7861, 9000]HEALTH_ENDPOINTS = ["/healthz", "/health", "/version", "/"]POLL_INTERVAL = 1.0POLL_TIMEOUT = 15 * 60DRIVE_ROOT = NoneASSETS = NoneOUTPUT = None

## 1) 🧠 Mount Google Drive & prepare workspace

In [None]:
def mount_drive_and_prepare():    global DRIVE_ROOT, ASSETS, OUTPUT    try:        from google.colab import drive  # type: ignore        drive.mount('/content/drive')        DRIVE_ROOT = pathlib.Path('/content/drive/MyDrive/VideoRobot')        ASSETS = DRIVE_ROOT / 'Assets'        OUTPUT = DRIVE_ROOT / 'Output'        ASSETS.mkdir(parents=True, exist_ok=True)        OUTPUT.mkdir(parents=True, exist_ok=True)        print('✅ Drive mounted at:', DRIVE_ROOT)        print('📂 Assets:', ASSETS)        print('📂 Output:', OUTPUT)    except Exception as e:        print('ℹ️ Drive mount failed:', e)        ASSETS = pathlib.Path('/content/Assets')        OUTPUT = pathlib.Path('/content/Output')        ASSETS.mkdir(exist_ok=True)        OUTPUT.mkdir(exist_ok=True)        print('📂 Using local folders:', ASSETS, OUTPUT)    os.environ['PYTHONUNBUFFERED'] = '1'    os.environ['SDL_AUDIODRIVER'] = 'dummy'    os.environ['XDG_RUNTIME_DIR'] = '/tmp'mount_drive_and_prepare()

## 2) ⚙️ Clone repo + install dependencies

In [None]:
def clone_and_install():    shutil.rmtree(CLONE_DIR, ignore_errors=True)    print('▶ git clone:', REPO_URL)    subprocess.run(['git','clone','--depth','1',REPO_URL,str(CLONE_DIR)], check=True)    print('✅ Repo cloned at', CLONE_DIR)    print('▶ apt-get ffmpeg')    subprocess.run(['apt-get','-qq','update'], check=True)    subprocess.run(['apt-get','-qq','-y','install','ffmpeg'], check=True)    print('✅ ffmpeg installed')    print('▶ Upgrade pip/setuptools/wheel')    subprocess.run([sys.executable,'-m','pip','install','-q','--upgrade','pip','setuptools','wheel'], check=True)    req = BACKEND_DIR / 'requirements.txt'    if req.exists():        print('▶ Install backend requirements')        subprocess.run([sys.executable,'-m','pip','install','-q','-r',str(req),'--no-cache-dir'], check=True)    else:        print('⚠️ requirements.txt missing; installing common deps')        subprocess.run([sys.executable,'-m','pip','install','-q','flask','flask-cors','moviepy','imageio-ffmpeg','requests'], check=True)    # Lock crucial wheels for stability    subprocess.run([sys.executable,'-m','pip','install','-q','moviepy>=1.0.3,<3','imageio-ffmpeg>=0.4.9,<1'], check=True)    print('✅ Dependencies installed')clone_and_install()

## 3) 🚀 Start backend (as package) & healthcheck

In [None]:
def _clean_py_caches(root: pathlib.Path):    for p in root.rglob('__pycache__'): shutil.rmtree(p, ignore_errors=True)    for p in root.rglob('*.pyc'): p.unlink(missing_ok=True)def _patch_imports_once():    patched = []    for py in BACKEND_DIR.rglob('*.py'):        txt = py.read_text(encoding='utf-8', errors='ignore'); orig = txt        # from X import ...  -> from .X import ...        txt = re.sub(r'\bfrom\s+config\s+import\b',           'from .config import',           txt)        txt = re.sub(r'\bfrom\s+scheduler\s+import\b',        'from .scheduler import',        txt)        txt = re.sub(r'\bfrom\s+utils\s+import\b',            'from .utils import',            txt)        txt = re.sub(r'\bfrom\s+renderer_service\s+import\b', 'from .renderer_service import', txt)        txt = re.sub(r'\bfrom\s+renderer\s+import\b',         'from .renderer import',         txt)        # import X -> from . import X        txt = re.sub(r'(^|\n)\s*import\s+config(\s|$)',       r'\1from . import config\2',    txt)        txt = re.sub(r'(^|\n)\s*import\s+utils(\s|$)',        r'\1from . import utils\2',     txt)        txt = re.sub(r'(^|\n)\s*import\s+renderer(\s|$)',     r'\1from . import renderer\2',  txt)        if txt != orig: py.write_text(txt, encoding='utf-8'); patched.append(py.name)    print('🛠 patched imports:', patched if patched else '(none)')def _pick_port():    env_p = os.environ.get('BACKEND_PORT')    if env_p:        try:            p0 = int(env_p); s = socket.socket(); s.bind(('127.0.0.1', p0)); s.close(); return p0        except Exception: pass    for p in DEFAULT_PORTS:        try:            s = socket.socket(); s.bind(('127.0.0.1', p)); s.close(); return p        except OSError: continue    return 8000def start_backend_and_healthcheck():    assert CLONE_DIR.exists(), 'Repo not found'    assert BACKEND_DIR.exists(), 'backend folder missing'    # Make package if missing    (BACKEND_DIR / '__init__.py').write_text('', encoding='utf-8') if not (BACKEND_DIR / '__init__.py').exists() else None    # Clean caches + sys.modules to avoid stale state    _clean_py_caches(CLONE_DIR)    for k in [k for k in list(sys.modules.keys()) if k.startswith('backend')]: sys.modules.pop(k, None)    # Ensure repo on sys.path    if str(CLONE_DIR) not in sys.path: sys.path.insert(0, str(CLONE_DIR))    # Minimal import patch    _patch_imports_once()    # Kill stale processes    for pat in ('backend/main.py','backend.main'):        for pid in subprocess.getoutput(f"pgrep -f '{pat}' || true").split():            try: os.kill(int(pid), signal.SIGKILL)            except Exception: pass    port = _pick_port(); os.environ['BACKEND_PORT'] = str(port); base = f'http://127.0.0.1:{port}'    print('🚪 Using BACKEND_PORT =', port)    # Run backend as module (so relative imports work)    with open(LOG_PATH, 'w', encoding='utf-8') as logf:        proc = subprocess.Popen([sys.executable, '-m', 'backend.main'], cwd=str(CLONE_DIR),                                stdout=logf, stderr=subprocess.STDOUT, text=True, env=os.environ.copy())    print('PID:', proc.pid, 'LOG:', LOG_PATH)    # Healthcheck with backoff; always show tail if fail    ok = False    for delay in (1,2,4,8,8,8,8):        for ep in HEALTH_ENDPOINTS:            try:                r = requests.get(base + ep, timeout=2)                if r.ok:                    print('✅ Health via', ep, '→', (r.text or '')[:200])                    ok = True; break            except Exception:                pass        if ok: break        print(f'⏳ Waiting {delay}s …')        time.sleep(delay)    if not ok:        print('—— LOG TAIL ——')        print(subprocess.getoutput(f'tail -n 200 {LOG_PATH} || true'))        raise SystemExit('Backend failed')    print('✅ Backend ready at', base)    return baseBASE = start_backend_and_healthcheck()

## 4) 🧾 Auto-build manifest (detects intro/outro/voice/BGM)

In [None]:
def build_manifest():    def pick(patterns):        for pat in patterns:            lst = list(ASSETS.glob(pat))            if lst: return lst[0]        return None    voice = pick(['*.mp3','*.wav','*.m4a'])    bgvid = pick(['*Background*.mp4','*BG*.mp4','*.mp4'])    bgimg = pick(['*Background*.png','*Background*.jpg','*.png','*.jpg','*.jpeg'])    intro = pick(['*Intro*.mp4','Intro.mp4'])    outro = pick(['*Outro*.mp4','Outro.mp4'])    cta   = pick(['*CTA*.mp4','CTA.mp4'])    bgm   = pick(['*music*.mp3','*BGM*.mp3'])    broll = pick(['*broll*.mp4','*BRoll*.mp4'])    tracks = []    if intro: tracks.append({"type":"video","src":str(intro),"start":0,"fit":"cover"})    if bgvid: tracks.append({"type":"video","src":str(bgvid),"start":0,"fit":"cover"})    elif bgimg: tracks.append({"type":"image","src":str(bgimg),"start":0,"duration":60,"x":0,"y":0,"scale":1.0})    if voice: tracks.append({"type":"audio","src":str(voice),"start":0,"gain_db":0})    if broll: tracks.append({"type":"video","src":str(broll),"start":6,"duration":4,"fit":"cover"})    tracks.append({"type":"text","content":"Real Smart English","start":0.6,"duration":3.5,"x":80,"y":140,"size":84,"color":"#FFD700"})    if cta: tracks.append({"type":"video","src":str(cta),"start":12,"fit":"cover"})    if outro: tracks.append({"type":"video","src":str(outro),"start":15,"fit":"cover"})    if not tracks:        tracks = [{"type":"text","content":"Hello VideoRobot","start":0.2,"duration":4,"x":80,"y":120,"size":84,"color":"#FFFFFF"}]    manifest = {      "seed": 11,      "video": {"width":1080, "height":1920, "fps":30, "bg_color":"#000000"},      "tracks": tracks,      "config": {        "aspectRatio": "9:16",        "audio": {"useVAD": True, "targetLufs": -16.0},        "bgm": ({"path": str(bgm), "gain_db": -8, "ducking": True} if bgm else {})      }    }    MANIFEST_PATH.write_text(json.dumps(manifest, ensure_ascii=False, indent=2))    print('✅ Manifest saved →', MANIFEST_PATH)    print('🔎 Picks:',          '\n  voice :', voice,          '\n  bgvid :', bgvid,          '\n  bgimg :', bgimg,          '\n  intro :', intro,          '\n  outro :', outro,          '\n  cta   :', cta,          '\n  bgm   :', bgm,          '\n  broll :', broll)    return manifestbuild_manifest()

## 5) 🎥 Render + progress + download + copy

In [None]:
def render_and_retrieve():    payload = json.loads(MANIFEST_PATH.read_text(encoding='utf-8'))    r = requests.post(BASE + '/render', json=payload, timeout=180)    print('POST /render:', r.status_code, (r.text or '')[:300])    r.raise_for_status()    js = r.json()    job = js.get('job_id') or js.get('data',{}).get('job_id') or js.get('jobId')    assert job, 'No job_id returned'    print('🎬 job_id:', job)    deadline = time.time() + POLL_TIMEOUT; tick = 0    while time.time() < deadline:        g = requests.get(BASE + f'/progress/{job}', timeout=10)        if g.ok:            pj = g.json()            st = pj.get('state') or pj.get('data',{}).get('state')            pct = pj.get('pct') or pj.get('data',{}).get('pct')            msg = pj.get('message') or pj.get('data',{}).get('message')            print(f'tick {tick:03d}:', st, pct, msg or '')            if st == 'success': break            if st == 'error': raise SystemExit(pj)        else:            print('progress http', g.status_code, (g.text or '')[:200])        tick += 1; time.sleep(POLL_INTERVAL)    # Download    out = pathlib.Path(f'/content/{job}.mp4')    url = BASE + f'/download?jobId={job}'    tmp = pathlib.Path(tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name)    with requests.get(url, stream=True, timeout=120) as d:        d.raise_for_status()        with open(tmp, 'wb') as f:            for chunk in d.iter_content(8192):                if chunk: f.write(chunk)    shutil.move(str(tmp), str(out))    print('📦 Downloaded:', out)    # Copy to Drive    try:        dst = OUTPUT / out.name        dst.parent.mkdir(parents=True, exist_ok=True)        shutil.copy2(str(out), str(dst))        print('✅ Copied to Drive:', dst)    except Exception as e:        print('ℹ️ Could not copy to Drive:', e)    # Download button    display(HTML('<a href="{u}" download target="_blank"><button style="font-size:16px; padding:10px; background:#4CAF50; color:white; border:none; border-radius:6px; padding:10px 16px;">🔗 Download MP4</button></a>'.format(u=url)))    print('🎉 Done! File:', out)    return outrender_and_retrieve()