# 🎬 UnQVision Video Generation Server

This notebook runs the UnQVision backend server on Google Colab, handling all resource-intensive video generation operations. By using Google Colab's free resources, you can process videos without requiring a dedicated server.

## 📋 Instructions

1. Enter your API keys in the form fields below
2. Run all cells (Runtime > Run all)
3. Follow the Cloudflare authentication process when prompted
4. Copy the Cloudflare URL to use with your UnQVision frontend

**Note**: The server will run as long as this Colab session is active. Google Colab sessions automatically disconnect after 90 minutes of inactivity.

## 🔑 API Keys

Enter your API keys below. These will be used for text generation, speech synthesis, and fetching stock videos.

In [None]:
# @title API Keys
google_api_keys = "YOUR_GOOGLE_API_KEY" # @param {type:"string"}
pexels_api_keys = "YOUR_PEXELS_API_KEY" # @param {type:"string"}

# Store API keys securely
import os
os.environ["GOOGLE_API_KEYS"] = google_api_keys
os.environ["PEXELS_API_KEYS"] = pexels_api_keys

# Validate API keys
if not google_api_keys or google_api_keys == "YOUR_GOOGLE_API_KEY" or len(google_api_keys) < 10:
    print("⚠️ Warning: Google API key is missing or invalid. You'll need this for text generation and speech synthesis.")
else:
    print("✅ Google API key configured")
    
if not pexels_api_keys or pexels_api_keys == "YOUR_PEXELS_API_KEY" or len(pexels_api_keys) < 10:
    print("⚠️ Warning: Pexels API key is missing or invalid. You'll need this for fetching stock videos.")
else:
    print("✅ Pexels API key configured")

## 📦 Install Dependencies

This section installs all the necessary Python packages for the video generation server.

In [None]:
# Install required Python packages
!pip install flask flask-cors requests ffmpeg-python --quiet

# Download and install the actual cloudflared binary
!wget https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64
!chmod +x cloudflared-linux-amd64
!mv cloudflared-linux-amd64 /usr/local/bin/cloudflared

# Install FFmpeg if not already available
!apt-get update -qq && apt-get install -y ffmpeg

# Create temp directory for storing files
!mkdir -p /content/temp

## 🖥️ Flask Server Implementation

This section implements the Flask server with endpoints for video generation.

In [None]:
import json
import os
import time
import uuid
from datetime import datetime
from threading import Thread

import ffmpeg
import requests
from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS

# Initialize Flask app
app = Flask(__name__)
CORS(app)  # Enable CORS for all routes

# Configuration
TEMP_DIR = "/content/temp"
os.makedirs(TEMP_DIR, exist_ok=True)

# Global storage for progress updates
progress_updates = {}

# Helper function to split comma-separated API keys
def split_api_keys(api_keys_str):
    return [key.strip() for key in api_keys_str.split(',') if key.strip()]

# Helper function for retry with different keys
def retry_with_keys(operation, api_keys, operation_name):
    keys = split_api_keys(api_keys)
    if not keys:
        raise ValueError(f"No valid API keys provided for {operation_name}")
    
    last_error = None
    for i, key in enumerate(keys):
        try:
            print(f"Attempting {operation_name} with key #{i+1}...")
            return operation(key)
        except Exception as e:
            print(f"Key #{i+1} failed: {str(e)}")
            last_error = e
            # If this is not the last key, wait for 1 second before trying the next one.
            if i < len(keys) - 1:
                print("Waiting for 1 second before retrying with the next key...")
                time.sleep(1)
    
    raise ValueError(f"All API keys failed for {operation_name}: {str(last_error)}")

# Helper function to add a log message
def add_log(session_id, message, log_type='info'):
    log_entry = {
        'id': int(time.time() * 1000),
        'message': message,
        'timestamp': datetime.now().strftime('%H:%M:%S'),
        'type': log_type
    }
    
    if session_id not in progress_updates:
        progress_updates[session_id] = []
        
    progress_updates[session_id].append(log_entry)
    print(f"[{log_entry['timestamp']}] {message}")
    return log_entry

# Function to generate AI director plan
def generate_ai_director_plan(prompt, api_keys, session_id):
    add_log(session_id, "🧠 Generating cinematic script with Gemini...", "processing")
    
    director_prompt = f"""As an AI Director, create a detailed scene-by-scene plan for a video about: \"{prompt}\"\n\nReturn ONLY a JSON object with this exact structure:\n\n{{\n  \"scenes\": [\n    {{\n      \"description\": \"A detailed description of what happens in this scene\",\n      \"searchKeywords\": \"keywords for finding stock footage\",\n      \"duration\": 5\n    }}\n  ],\n  \"totalDuration\": 30\n}}\n\nCreate 6 scenes, each 5 seconds long. Focus on visual, cinematic storytelling."""

    def ai_director_operation(api_key):
        response = requests.post(
            f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent?key={api_key}",
            json={
                "contents": [{"parts": [{"text": director_prompt}]}]
            },
            headers={'Content-Type': 'application/json'}
        )
        
        if response.status_code != 200:
            raise ValueError(f"Failed to generate plan: {response.text}")
        
        result = response.json()
        plan_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
        
        if not plan_text:
            raise ValueError("No plan generated")
        
        # Extract JSON from response
        import re
        json_match = re.search(r'\{[\s\S]*\}', plan_text)
        if not json_match:
            raise ValueError("Invalid plan format")
        
        return json.loads(json_match.group(0))
    
    return retry_with_keys(ai_director_operation, api_keys, "AI Director plan generation")

# Function to translate script
def translate_script(script, language, api_keys, session_id):
    add_log(session_id, f"🌐 Translating script to {language}...", "processing")
    
    translate_prompt = f"""Translate the following script to {language}. Return ONLY the translated text, nothing else:\n\n{script}"""

    def translate_operation(api_key):
        response = requests.post(
            f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-pro-latest:generateContent?key={api_key}",
            json={
                "contents": [{"parts": [{"text": translate_prompt}]}]
            },
            headers={'Content-Type': 'application/json'}
        )
        
        if response.status_code != 200:
            raise ValueError(f"Translation failed: {response.text}")
        
        result = response.json()
        translated_text = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
        
        return translated_text or script
    
    try:
        return retry_with_keys(translate_operation, api_keys, "Script translation")
    except Exception as e:
        print(f"Translation error: {str(e)}")
        return script  # Fall back to original script on error

# Function to generate speech
def generate_speech(text, model, api_keys, session_id):
    add_log(session_id, "🎤 Generating voiceover...", "processing")
    
    def tts_operation(api_key):
        response = requests.post(
            f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}",
            json={
                "contents": [{"parts": [{"text": text}]}],
                "generationConfig": {"temperature": 0.8}
            },
            headers={'Content-Type': 'application/json'}
        )
        
        if response.status_code != 200:
            raise ValueError(f"TTS generation failed: {response.text}")
        
        result = response.json()
        candidates = result.get("candidates", [])
        
        if not candidates or not candidates[0].get("content"):
            raise ValueError("No audio content generated")
        
        # Extract audio data
        audio_part = None
        for part in candidates[0]["content"]["parts"]:
            if "audio_data" in part:
                audio_part = part
                break
        
        if not audio_part:
            raise ValueError("No audio data in response")
        
        # Save to file
        audio_path = os.path.join(TEMP_DIR, f"tts_{time.time()}.mp3")
        import base64
        with open(audio_path, "wb") as f:
            f.write(base64.b64decode(audio_part["audio_data"]))
        
        return audio_path
    
    return retry_with_keys(tts_operation, api_keys, "Text-to-speech generation")

# Function to fetch video from Pexels
def fetch_pexels_video(keywords, api_keys, session_id):
    def pexels_operation(api_key):
        response = requests.get(
            f"https://api.pexels.com/videos/search?query={keywords}&per_page=1",
            headers={"Authorization": api_key}
        )
        
        if response.status_code != 200:
            raise ValueError(f"Failed to fetch video from Pexels: {response.text}")
        
        result = response.json()
        video_url = None
        
        if result.get("videos") and len(result["videos"]) > 0:
            # Find a medium-quality video file (not too large, not too small)
            video_files = result["videos"][0]["video_files"]
            # Sort by quality, prioritizing medium quality
            video_files.sort(key=lambda x: abs(x.get("height", 0) - 720))
            video_url = video_files[0]["link"]
        
        if not video_url:
            raise ValueError("No video found")
        
        # Download video
        video_path = os.path.join(TEMP_DIR, f"pexels_{time.time()}.mp4")
        video_data = requests.get(video_url).content
        with open(video_path, "wb") as f:
            f.write(video_data)
        
        return video_path
    
    return retry_with_keys(pexels_operation, api_keys, "Pexels video fetch")

# Function to concatenate videos with transitions
def concatenate_videos(video_paths, session_id):
    add_log(session_id, "✂️ Stitching videos with cinematic transitions...", "processing")
    
    if not video_paths:
        raise ValueError("No video paths provided")
    
    # If only one video, return it directly
    if len(video_paths) == 1:
        return video_paths[0]
    
    output_path = os.path.join(TEMP_DIR, f"concatenated_{time.time()}.mp4")
    
    # Create a complex filter for crossfade transitions
    filter_complex = ""
    inputs = []
    
    for i, video_path in enumerate(video_paths):
        inputs.append(ffmpeg.input(video_path))
    
    # Simple concatenation for now (ffmpeg-python's xfade is complex)
    concat_inputs = []
    for i, input_stream in enumerate(inputs):
        # Ensure consistent dimensions and framerate
        concat_inputs.append(input_stream.video
                            .filter('scale', 1280, 720)
                            .filter('fps', fps=30))
    
    # Concatenate all videos
    concat = ffmpeg.concat(*concat_inputs, v=1, a=0)
    
    # Output to file
    ffmpeg.output(concat, output_path, vcodec='libx264', preset='fast', crf=23).run()
    
    return output_path

# Function to merge video with audio
def merge_video_audio(video_path, audio_path, session_id):
    add_log(session_id, "🎵 Merging video with voiceover...", "processing")
    
    output_path = os.path.join(TEMP_DIR, f"final_{time.time()}.mp4")
    
    # Merge video and audio using FFmpeg
    ffmpeg.input(video_path).input(audio_path).output(
        output_path,
        vcodec='copy',  # Don't re-encode video
        acodec='aac',   # Use AAC for audio
        shortest=None   # Match shortest stream duration
    ).run()
    
    return output_path

# Function to generate video
def generate_video(config, session_id):
    try:
        add_log(session_id, "✅ Validating inputs...", "success")
        
        # Validate inputs
        if not config.get("projectTitle") or not config.get("mainPrompt"):
            raise ValueError("Project title and prompt are required")
        if not config.get("googleApiKeys") or not config.get("pexelsApiKeys"):
            raise ValueError("API keys are required")
        if not config.get("languages") or len(config["languages"]) == 0:
            raise ValueError("At least one language must be selected")
        
        add_log(session_id, "🎬 Initializing video processing...", "processing")
        
        # Generate scene plan
        scene_plan = generate_ai_director_plan(
            config["mainPrompt"], 
            config["googleApiKeys"],
            session_id
        )
        
        # Create script from scenes
        script = " ".join([scene["description"] for scene in scene_plan["scenes"]])
        
        # Process the first language only (for now)
        language = config["languages"][0]
        
        # Translate the script
        translated_script = translate_script(
            script, 
            language, 
            config["googleApiKeys"],
            session_id
        )
        
        # Generate speech
        audio_path = generate_speech(
            translated_script,
            config["voiceModel"],
            config["googleApiKeys"],
            session_id
        )
        
        # Fetch videos
        add_log(session_id, "🎬 Fetching video footage from Pexels...", "processing")
        
        video_paths = []
        for i, scene in enumerate(scene_plan["scenes"]):
            add_log(
                session_id,
                f"🎬 Fetching video {i+1}/{len(scene_plan['scenes'])}: {scene['searchKeywords']}",
                "processing"
            )
            
            try:
                video_path = fetch_pexels_video(
                    scene["searchKeywords"],
                    config["pexelsApiKeys"],
                    session_id
                )
                video_paths.append(video_path)
            except Exception as e:
                # Fallback to main prompt
                add_log(
                    session_id,
                    "⚠️ Scene-specific video not found, using main prompt...",
                    "warning"
                )
                fallback_path = fetch_pexels_video(
                    config["mainPrompt"],
                    config["pexelsApiKeys"],
                    session_id
                )
                video_paths.append(fallback_path)
        
        # Concatenate videos
        silent_video_path = concatenate_videos(video_paths, session_id)
        
        # Merge video with audio
        final_video_path = merge_video_audio(silent_video_path, audio_path, session_id)
        
        # Generate final filename
        safe_title = config["projectTitle"].replace(" ", "_").replace("/", "_")
        final_filename = f"{safe_title}_{int(time.time())}.mp4"
        final_destination = os.path.join(TEMP_DIR, final_filename)
        
        # Copy to final destination with proper name
        if final_video_path != final_destination:
            import shutil
            shutil.copy(final_video_path, final_destination)
        
        add_log(session_id, f"✅ Video for {language} is ready!", "success")
        
        # Schedule cleanup of temporary files after 1 hour
        def cleanup_files():
            time.sleep(60 * 60)  # 1 hour
            try:
                for path in video_paths + [silent_video_path, audio_path, final_video_path]:
                    if os.path.exists(path) and path != final_destination:
                        os.unlink(path)
            except Exception as e:
                print(f"Error during cleanup: {str(e)}")
        
        Thread(target=cleanup_files).start()
        
        return final_filename
        
    except Exception as e:
        error_message = str(e) or "undefined"
        user_friendly_message = "Video generation failed due to an unknown error" if error_message == "undefined" else error_message
        
        add_log(session_id, f"❌ Error: {user_friendly_message}", "warning")
        raise ValueError(user_friendly_message)

# API endpoints
@app.route('/generate-video', methods=['POST'])
def api_generate_video():
    try:
        config = request.json
        session_id = str(uuid.uuid4())
        
        # Initialize progress updates for this session
        progress_updates[session_id] = []
        
        # Start video generation in a separate thread
        def generate_video_thread():
            try:
                filename = generate_video(config, session_id)
                # Progress updates are collected during generation
            except Exception as e:
                add_log(session_id, f"❌ Error: {str(e)}", "warning")
        
        thread = Thread(target=generate_video_thread)
        thread.start()
        
        # Return the session ID for polling status
        return jsonify({
            'success': True,
            'sessionId': session_id,
            'message': 'Video generation started'
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

# Endpoint to check generation status
@app.route('/generation-status/<session_id>', methods=['GET'])
def api_generation_status(session_id):
    if session_id not in progress_updates:
        return jsonify({
            'success': False,
            'error': 'Session not found'
        }), 404
    
    updates = progress_updates[session_id]
    
    # Check if generation is complete
    is_complete = False
    is_error = False
    video_filename = None
    
    for update in updates:
        if update['type'] == 'success' and 'ready' in update['message'].lower():
            is_complete = True
            # Extract filename from the logs if possible
            for u in updates:
                if '.mp4' in u['message']:
                    parts = u['message'].split()
                    for part in parts:
                        if '.mp4' in part:
                            video_filename = part
                            break
                    if video_filename:
                        break
        elif update['type'] == 'warning' and 'error' in update['message'].lower():
            is_error = True
    
    # Find the latest video file if we couldn't determine it from logs
    if is_complete and not video_filename:
        video_files = [f for f in os.listdir(TEMP_DIR) if f.endswith('.mp4') and f.startswith('final_')]
        if video_files:
            video_files.sort(key=lambda f: os.path.getmtime(os.path.join(TEMP_DIR, f)), reverse=True)
            video_filename = video_files[0]
    
    return jsonify({
        'success': True,
        'progressUpdates': updates,
        'isComplete': is_complete,
        'isError': is_error,
        'videoUrl': f'/temp/{video_filename}' if video_filename else None
    })

# Serve static files
@app.route('/temp/<path:filename>')
def serve_temp_file(filename):
    return send_from_directory(TEMP_DIR, filename)

# Status endpoint
@app.route('/status', methods=['GET'])
def api_status():
    return jsonify({
        'status': 'Server is running'
    })

print("Flask server code loaded successfully")

## 🚀 Launch Server with Cloudflare Tunnel

This section starts the Flask server and creates a Cloudflare Tunnel to expose it to the internet.

In [None]:
# Import required packages
import subprocess
import threading
import time
import re

# Function to run the Flask server in the background
def run_flask_server():
    from flask import Flask, request
    app.run(host='localhost', port=8000)

# Start Flask server in a separate thread
flask_thread = threading.Thread(target=run_flask_server)
flask_thread.daemon = True
flask_thread.start()

print("Starting Flask server...")
time.sleep(2)  # Give Flask server time to start

# Function to extract the Cloudflare Tunnel URL from output
def extract_cloudflare_url(output):
    # Look for URLs in the output
    url_pattern = r'https://[\w.-]+\.trycloudflare\.com'
    match = re.search(url_pattern, output)
    if match:
        return match.group(0)
    return None

# Start Cloudflare Tunnel
print("Starting Cloudflare Tunnel...")
cloudflared_process = subprocess.Popen(
    ['cloudflared', 'tunnel', '--url', 'http://localhost:8000'],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True,
    bufsize=1
)

# Wait for the tunnel URL
tunnel_url = None
start_time = time.time()
timeout = 30  # 30 seconds timeout

print("Waiting for Cloudflare Tunnel to establish...")
while time.time() - start_time < timeout and tunnel_url is None:
    output_line = cloudflared_process.stdout.readline().strip()
    if output_line:
        print(output_line)
        tunnel_url = extract_cloudflare_url(output_line)
    time.sleep(0.1)

if tunnel_url:
    print(f"\n\n🚀 Server is running! Your Cloudflare Tunnel URL is:\n\n{tunnel_url}\n\n")
    print("Use this URL to connect your UnQVision frontend.")
    print("\nTo stop the server, interrupt the notebook execution (Runtime > Interrupt execution)")
    
    # Keep this cell running to maintain the tunnel
    try:
        while True:
            # Check if cloudflared is still running
            if cloudflared_process.poll() is not None:
                print("⚠️ Cloudflare Tunnel stopped unexpectedly. Restarting...")
                cloudflared_process = subprocess.Popen(
                    ['cloudflared', 'tunnel', '--url', 'http://localhost:8000'],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    text=True,
                    bufsize=1
                )
            # Print status every 10 minutes to keep the notebook from timing out
            print(f"✅ Server is still running at {tunnel_url} ({time.strftime('%H:%M:%S')})")
            time.sleep(600)  # 10 minutes
    except KeyboardInterrupt:
        print("Shutting down server...")
        if cloudflared_process:
            cloudflared_process.terminate()
else:
    print("❌ Failed to start Cloudflare Tunnel within the timeout period.")