In [1]:
!pip uninstall -y langgraph langchain langchain-core langchain-community huggingface-hub

Found existing installation: langgraph 0.2.76
Uninstalling langgraph-0.2.76:
  Successfully uninstalled langgraph-0.2.76
Found existing installation: langchain 0.3.27
Uninstalling langchain-0.3.27:
  Successfully uninstalled langchain-0.3.27
Found existing installation: langchain-core 0.3.75
Uninstalling langchain-core-0.3.75:
  Successfully uninstalled langchain-core-0.3.75
Found existing installation: langchain-community 0.3.28
Uninstalling langchain-community-0.3.28:
  Successfully uninstalled langchain-community-0.3.28
Found existing installation: huggingface-hub 0.34.4
Uninstalling huggingface-hub-0.34.4:
  Successfully uninstalled huggingface-hub-0.34.4


In [2]:
# --- GPU stack (cu121) hard reset: pin Numpy 1.x + SciPy + scikit-learn, PyTorch 2.3.0, CUDA runtime ---

import sys, subprocess, os, site, textwrap

def pip(*args):
    print(">> pip", *args)
    return subprocess.check_call([sys.executable, "-m", "pip", *args])

# 0) Purge conflicting wheels (incl. sklearn which drags SciPy/Numpy)
pip("uninstall","-y",
    "torch","torchvision","torchaudio",
    "numpy","scipy","scikit-learn",
    "opencv-python","opencv-python-headless",
    "av","gradio","gradio-client","tokenizers",
    "nvidia-cuda-runtime-cu12","nvidia-cudnn-cu12","nvidia-cublas-cu12",
    "xformers","jax","jaxlib"
)

# 1) Tooling
pip("install","-U","pip","setuptools","wheel")

# 2) Foundation ABI pair (NUMPY 1.x!) + sklearn that supports Py3.12 and 1.x ABI
pip("install","--no-cache-dir","--force-reinstall","--no-deps","numpy==1.26.4")
pip("install","--no-cache-dir","--force-reinstall","--no-deps","scipy==1.11.4")
pip("install","--no-cache-dir","--force-reinstall","--no-deps","scikit-learn==1.4.2")

# 3) LangGraph/LangChain pins (don’t touch numpy/torch)
for dep in [
    "langgraph>=0.2.50,<0.3",
    "langchain>=0.2.16,<0.4",
    "langchain-core>=0.2.38,<0.4",
    "langchain-community>=0.2.10,<0.4",
]:
    pip("install","--no-cache-dir","--force-reinstall",dep)

# 4) PyTorch cu121 wheels (GPU)
pip("install","--no-cache-dir","--force-reinstall",
    "--index-url","https://download.pytorch.org/whl/cu121",
    "torch==2.3.0","torchvision==0.18.0","torchaudio==2.3.0")

# 5) Provide CUDA runtime libs (so libcudnn.so.8 etc. are present)
pip("install","--no-cache-dir","--force-reinstall",
    "nvidia-cuda-runtime-cu12==12.1.105",
    "nvidia-cudnn-cu12==8.9.5.30",
    "nvidia-cublas-cu12==12.1.3.1"
)

# 6) HF stack (tokenizers BEFORE transformers) w/ minimal deps to avoid numpy upgrades
pip("install","--no-cache-dir","--force-reinstall","--no-deps","tokenizers==0.21.0")
pip("install","--no-cache-dir","--force-reinstall","--no-deps","transformers==4.55.3")
pip("install","--no-cache-dir","--force-reinstall","--no-deps","accelerate==0.32.1")
pip("install","--no-cache-dir","--force-reinstall","--no-deps","huggingface_hub==0.34.4")

# 7) Other deps (no-deps so they cannot drag numpy/torch)
for dep in [
    "gradio>=5.44.0",
    "gradio-client>=1.4",
    "GPUtil","psutil",
    "bitsandbytes==0.43.1",  # requires CUDA; OK w/ GPU
    "av==12.0.0",
    "opencv-python-headless==4.9.0.80",
    "tiktoken==0.7.0",
    "python-dotenv==1.0.1",
    "mcp",
    "packaging>=23.2",
]:
    try:
        pip("install","--no-cache-dir","--force-reinstall","--no-deps",dep)
    except subprocess.CalledProcessError:
        pip("install","--no-cache-dir","--force-reinstall",dep)

# 8) Belt & suspenders: re-pin the critical ones WITHOUT deps
pip("install","--no-cache-dir","--force-reinstall","--no-deps","numpy==1.26.4","scipy==1.11.4","scikit-learn==1.4.2")
pip("install","--no-cache-dir","--force-reinstall","--no-deps","torch==2.3.0","torchvision==0.18.0","torchaudio==2.3.0")

# 9) sitecustomize: ensure CUDA libs on LD_LIBRARY_PATH every start
nvidia_lib_dirs = []
for root in site.getsitepackages():
    for sub in [("nvidia","cuda_runtime","lib"),("nvidia","cudnn","lib"),("nvidia","cublas","lib")]:
        d = os.path.join(root,*sub)
        if os.path.isdir(d): nvidia_lib_dirs.append(d)

sc_path = os.path.join(site.getsitepackages()[0],"sitecustomize.py")
with open(sc_path,"w") as f:
    f.write(textwrap.dedent(f"""
        import os
        _paths = {nvidia_lib_dirs!r}
        if _paths:
            _cur = os.environ.get("LD_LIBRARY_PATH","")
            _prefix = ":".join(p for p in _paths if os.path.isdir(p))
            if _prefix:
                os.environ["LD_LIBRARY_PATH"] = _prefix + (":" + _cur if _cur else "")
            print("[sitecustomize] LD_LIBRARY_PATH set:", os.environ.get("LD_LIBRARY_PATH",""))
    """))
print("✅ Wrote", sc_path)

# 10) Restart so new wheels & LD_LIBRARY_PATH take effect for the next process
import IPython; IPython.get_ipython().kernel.do_shutdown(restart=True)


>> pip uninstall -y torch torchvision torchaudio numpy scipy scikit-learn opencv-python opencv-python-headless av gradio gradio-client tokenizers nvidia-cuda-runtime-cu12 nvidia-cudnn-cu12 nvidia-cublas-cu12 xformers jax jaxlib
>> pip install -U pip setuptools wheel
>> pip install --no-cache-dir --force-reinstall --no-deps numpy==1.26.4
>> pip install --no-cache-dir --force-reinstall --no-deps scipy==1.11.4
>> pip install --no-cache-dir --force-reinstall --no-deps scikit-learn==1.4.2
>> pip install --no-cache-dir --force-reinstall langgraph>=0.2.50,<0.3
>> pip install --no-cache-dir --force-reinstall langchain>=0.2.16,<0.4
>> pip install --no-cache-dir --force-reinstall langchain-core>=0.2.38,<0.4
>> pip install --no-cache-dir --force-reinstall langchain-community>=0.2.10,<0.4
>> pip install --no-cache-dir --force-reinstall --index-url https://download.pytorch.org/whl/cu121 torch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0
>> pip install --no-cache-dir --force-reinstall nvidia-cuda-ru

{'status': 'ok', 'restart': True}

In [1]:
import importlib, numpy as np
def v(m):
    try: mobj = importlib.import_module(m); return getattr(mobj,"__version__","n/a")
    except Exception as e: return f"ERR: {e}"

mods = ["numpy","scipy","scikit_learn","torch","torchvision","torchaudio","transformers","tokenizers","cv2","av"]
print("=== Versions ===")
for m in mods: print(f"{m:14s}", v(m))

import torch
print("\nCUDA available:", torch.cuda.is_available(), "| CUDA:", torch.version.cuda)
if torch.cuda.is_available(): print("GPU:", torch.cuda.get_device_name(0))


=== Versions ===
numpy          1.26.4
scipy          1.11.4
scikit_learn   ERR: No module named 'scikit_learn'
torch          2.3.0+cu121
torchvision    0.18.0+cu121
torchaudio     2.3.0+cu121
transformers   4.55.3
tokenizers     0.21.0
cv2            4.9.0
av             12.0.0

CUDA available: True | CUDA: 12.1
GPU: NVIDIA A100-SXM4-40GB


In [2]:
import torch
import gc
import psutil
import GPUtil

def setup_gpu_environment():
    """Configure GPU environment for optimal performance"""

    print("=== GPU Environment Setup ===")

    if torch.cuda.is_available():
        print(f"✅ GPU Available: {torch.cuda.get_device_name(0)}")

        # Get GPU info
        gpu_properties = torch.cuda.get_device_properties(0)
        total_memory = gpu_properties.total_memory / 1024**3
        print(f"📊 Total GPU Memory: {total_memory:.1f} GB")

        # Set memory fraction (leave some room for system)
        memory_fraction = 0.85 if total_memory > 15 else 0.75
        torch.cuda.set_per_process_memory_fraction(memory_fraction)
        print(f"🎯 Memory Fraction Set: {memory_fraction}")

        # Clear any existing cache
        torch.cuda.empty_cache()
        gc.collect()

        return True, total_memory
    else:
        print("❌ No GPU available - will use CPU (very slow)")
        return False, 0

# Check system resources
def check_system_resources():
    """Check available system resources"""
    print("\n=== System Resources ===")

    # CPU info
    print(f"💻 CPU Cores: {psutil.cpu_count()}")

    # RAM info
    ram = psutil.virtual_memory()
    print(f"🧠 RAM: {ram.total / 1024**3:.1f} GB total, {ram.available / 1024**3:.1f} GB available")

    # GPU info
    try:
        gpus = GPUtil.getGPUs()
        if gpus:
            for gpu in gpus:
                print(f"🎮 GPU: {gpu.name}, Memory: {gpu.memoryTotal} MB")
    except:
        print("🎮 GPU info unavailable")

gpu_available, gpu_memory = setup_gpu_environment()
check_system_resources()

=== GPU Environment Setup ===
✅ GPU Available: NVIDIA A100-SXM4-40GB
📊 Total GPU Memory: 39.6 GB
🎯 Memory Fraction Set: 0.85

=== System Resources ===
💻 CPU Cores: 12
🧠 RAM: 83.5 GB total, 80.2 GB available
🎮 GPU: NVIDIA A100-SXM4-40GB, Memory: 40960.0 MB


In [3]:
# VideoLlava Agent with Proper MCP Integration
# Improved LangGraph workflow, better error handling, and modular architecture

import os
import re
import json
import asyncio
import logging
from typing import Dict, List, Any, Optional, TypedDict, Annotated, Union
from dataclasses import dataclass, field
from enum import Enum
import torch
import av
import numpy as np
from PIL import Image

# LangGraph and LangChain imports
from langgraph.graph import StateGraph, END, START
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, SystemMessage
from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig

# VideoLlava imports
from transformers import VideoLlavaProcessor, VideoLlavaForConditionalGeneration
from transformers import AutoTokenizer, AutoModelForCausalLM

# MCP imports (proper MCP client implementation)
try:
    from mcp import ClientSession, StdioServerParameters
    from mcp.client.stdio import stdio_client
    MCP_AVAILABLE = True
except ImportError:
    print("Warning: MCP not installed. Install with: pip install mcp")
    MCP_AVAILABLE = False

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration classes
@dataclass
class ModelConfig:
    """Configuration for models"""
    videollava_model: str = "LanguageBind/Video-LLaVA-7B-hf"
    router_model: str = "microsoft/Phi-3-mini-4k-instruct"
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    torch_dtype: torch.dtype = torch.float16
    max_frames: int = 32
    max_new_tokens: int = 256

@dataclass
class AgentConfig:
    """Configuration for agent behavior"""
    enable_reasoning: bool = True
    enable_memory: bool = True
    max_tool_calls: int = 3
    fallback_enabled: bool = True
    debug_mode: bool = False

class ToolType(Enum):
    """Enumeration of available tool types"""
    VISUAL_ANALYSIS = "visual_analysis"
    METADATA_EXTRACTION = "metadata_extraction"
    COMPREHENSIVE_SUMMARY = "comprehensive_summary"
    ANOMALY_DETECTION = "anomaly_detection"
    STEP_BY_STEP = "step_by_step"
    WEB_SEARCH = "web_search"
    RESPOND = "respond"
    LIST_TOOLS = "list_tools"

# Agent State
class AgentState(TypedDict):
    """state for VideoLlava agent with better type hints"""
    messages: Annotated[List[BaseMessage], "Chat messages"]
    video_path: Optional[str]
    current_task: Optional[str]
    tool_calls: List[Dict[str, Any]]
    context: Dict[str, Any]
    llm_classification: Optional[Dict[str, str]]
    error_context: Optional[Dict[str, str]]
    session_id: str
    reasoning_trace: List[str]

# Video processing utilities
class VideoProcessor:
    """Video processing utilities"""
    @staticmethod
    def uniform_sample_frames(video_path: str, num_frames: int = 32):
        """Uniformly sample frames across the whole video; returns list[PIL.Image]"""
        try:
            container = av.open(video_path)
            stream = container.streams.video[0]
            total = stream.frames or 0

            # Fall back: if no frame count, just decode and sample by index
            indices = (np.linspace(0, max(total - 1, 0), num_frames).astype(int).tolist()
                      if total > 0 else None)

            images = []
            i = 0
            target_set = set(indices) if indices is not None else None

            for frame in container.decode(video=0):
                take = (target_set is None) or (i in target_set)
                if take:
                    img = frame.to_ndarray(format="rgb24")
                    images.append(Image.fromarray(img))
                    if len(images) >= num_frames:
                        break
                i += 1

            container.close()
            return images
        except Exception as e:
            logger.error(f"Uniform sampling error: {e}")
            return []

    @staticmethod
    def extract_keyframes(video_path: str, max_frames: int = 32, threshold: float = 0.3):
        """Extract keyframes as a list of PIL.Image frames"""
        try:
            import cv2
            container = av.open(video_path)
            prev_hist = None
            selected = []

            for frame in container.decode(video=0):
                img = frame.to_ndarray(format="rgb24")        # (H, W, 3), RGB
                img_small = cv2.resize(img, (160, 90))
                hist = cv2.calcHist([img_small], [0, 1, 2], None,
                                    [8, 8, 8], [0, 256, 0, 256, 0, 256])
                hist = cv2.normalize(hist, hist).flatten()

                if prev_hist is None or cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) > threshold:
                    selected.append(Image.fromarray(img))      # convert to PIL here
                    prev_hist = hist

                if len(selected) >= max_frames:
                    break

            container.close()
            return selected  # <-- list[ PIL.Image ]

        except Exception as e:
            logger.error(f"Error extracting keyframes: {e}")
            raise


    @staticmethod
    def get_video_metadata(video_path: str) -> Dict[str, Any]:
        """Extract comprehensive video metadata"""
        try:
            container = av.open(video_path)
            video_stream = container.streams.video[0]

            # CORRECT conversion: AV_TIME_BASE units -> seconds
            duration_seconds = float(container.duration * av.time_base) if container.duration is not None else 0.0

            # fps fallback
            if video_stream.average_rate:
                fps = float(video_stream.average_rate)
            elif getattr(video_stream, "base_rate", None):
                fps = float(video_stream.base_rate)
            else:
                fps = 0.0

            metadata = {
                "duration_seconds": duration_seconds,
                "fps": fps,
                "frames": video_stream.frames,
                "width": video_stream.width,
                "height": video_stream.height,
                "codec": video_stream.codec_context.name,
                "bit_rate": video_stream.bit_rate,
                "pixel_format": str(video_stream.pix_fmt)
            }

            container.close()
            return metadata

        except Exception as e:
            logger.error(f"Error extracting metadata: {e}")
            raise


# LLM Router with better error handling
class LLMRouter:
    """Improved LLM router with better classification and error handling"""

    def __init__(self, model_config: ModelConfig):
        self.model_config = model_config
        self.device = model_config.device
        self.model = None
        self.tokenizer = None

        self._initialize_model()

    def _initialize_model(self):
        """Initialize routing model with comprehensive error handling"""
        try:
            logger.info(f"Loading routing model: {self.model_config.router_model}")

            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_config.router_model,
                trust_remote_code=True,
                padding_side="left"
            )

            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_config.router_model,
                torch_dtype=self.model_config.torch_dtype,
                device_map="auto" if torch.cuda.is_available() else None,
                trust_remote_code=True,
                low_cpu_mem_usage=True,
                use_cache=False,
                attn_implementation="eager"
            )

            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token

            self.model.eval()
            logger.info("LLM Router initialized successfully")

        except Exception as e:
            logger.error(f"Failed to initialize LLM Router: {e}")
            self.model = None
            self.tokenizer = None

    def classify_request(self, user_question: str, has_video: bool = True) -> Dict[str, str]:
        """classification with structured reasoning"""
        content = user_question.lower()

        def pack(tool: str, reason: str, conf="0.8"):
            return {
                "primary_tool": tool,
                "reasoning": reason,
                "custom_prompt": user_question,
                "confidence": conf,
            }

        if not has_video:
            # Route by intent even without video
            if any(k in content for k in ["duration", "fps", "frame rate", "resolution", "codec", "bitrate", "metadata"]):
                return pack("metadata_extraction", "Tech question without video; tool will handle missing video.")
            if any(k in content for k in ["step", "chronolog", "sequence", "timeline"]):
                return pack("step_by_step", "Sequential request without video; tool will handle missing video.")
            if any(k in content for k in ["unusual", "strange", "weird", "odd", "unexpected", "anomal"]):
                return pack("anomaly_detection", "Anomaly request without video; tool will handle missing video.")
            if any(k in content for k in ["tools", "capabilities", "help", "what can"]):
                return pack("list_tools", "User asked about capabilities.")
            # Default to visual_analysis so we try to answer from text context (if any)
            return pack("visual_analysis", "No video provided; attempt text-based answer if possible.", "0.7")

        try:
            prompt = self._create_classification_prompt(user_question, has_video)

            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                truncation=True,
                max_length=1024,
                padding=True
            )

            if torch.cuda.is_available():
                inputs = {k: v.to(self.device) for k, v in inputs.items()}

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=200,
                    do_sample=False,
                    pad_token_id=self.tokenizer.eos_token_id,
                    eos_token_id=self.tokenizer.eos_token_id,
                    use_cache=False
                )

            response = self.tokenizer.decode(
                outputs[0][inputs['input_ids'].shape[1]:],
                skip_special_tokens=True
            ).strip()

            return self._parse_classification_response(response, user_question, has_video)

        except Exception as e:
            logger.warning(f"LLM classification failed: {e}, using fallback")
            return self._fallback_classification(user_question, has_video)

    def _create_classification_prompt(self, user_question: str, has_video: bool) -> str:
        """Create structured classification prompt"""
        video_status = "Available" if has_video else "Not available"

        return f"""Analyze this user question and classify it for a video analysis system.

Question: "{user_question}"
Video: {video_status}

Available tools:
1. visual_analysis - Describe content, count objects, identify elements
2. metadata_extraction - Technical specs (duration, fps, resolution)
3. comprehensive_summary - Multi-angle analysis
4. anomaly_detection - Find unusual elements
5. step_by_step - Chronological breakdown
6. web_search - External information lookup
7. list_tools - Show available capabilities
8. respond - General conversation

Classification rules:
- Technical questions → metadata_extraction
- Visual/content questions → visual_analysis
- Comprehensive requests → comprehensive_summary
- Unusual/anomaly requests → anomaly_detection
- Sequential requests → step_by_step
- Tool inquiries → list_tools
- No video + video questions → respond

Respond in JSON format:
{{"tool": "tool_name", "confidence": 0.9, "reasoning": "explanation"}}"""

    def _parse_classification_response(self, response: str, user_question: str, has_video: bool) -> Dict[str, str]:
        """Parse LLM classification response"""
        try:
            # Extract JSON from response
            start_idx = response.find('{')
            end_idx = response.rfind('}') + 1

            if start_idx != -1 and end_idx > start_idx:
                json_str = response[start_idx:end_idx]
                classification = json.loads(json_str)

                return {
                    "primary_tool": classification.get("tool", "visual_analysis"),
                    "confidence": str(classification.get("confidence", 0.8)),
                    "reasoning": classification.get("reasoning", "LLM classification"),
                    "custom_prompt": user_question
                }
        except:
            pass

        return self._fallback_classification(user_question, has_video)

    def _fallback_classification(self, user_question: str, has_video: bool) -> Dict[str, str]:
        """fallback classification with pattern matching"""
        content = user_question.lower()

        if not has_video:
            def pack(tool, reason, conf="0.8"):
                return {"primary_tool": tool, "reasoning": reason, "custom_prompt": user_question, "confidence": conf}
            if any(k in content for k in ["duration","fps","frame rate","resolution","codec","bitrate","metadata"]):
                return pack("metadata_extraction", "Tech question without video; tool will handle missing video.")
            if any(k in content for k in ["step","chronolog","sequence","timeline"]):
                return pack("step_by_step", "Sequential request without video; tool will handle missing video.")
            if any(k in content for k in ["unusual","strange","weird","odd","unexpected","anomal"]):
                return pack("anomaly_detection", "Anomaly request without video; tool will handle missing video.")
            if any(k in content for k in ["tools","capabilities","help","what can"]):
                return pack("list_tools", "User asked about capabilities.")
            return pack("visual_analysis", "No video provided; attempt text-based answer if possible.", "0.7")

        # pattern matching
        patterns = {
            "metadata_extraction": ["duration", "long", "fps", "frame", "resolution", "specs", "technical", "codec"],
            "comprehensive_summary": ["comprehensive", "full", "everything", "complete", "detailed"],
            "anomaly_detection": ["unusual", "strange", "weird", "anomal", "odd", "unexpected"],
            "step_by_step": ["step", "break down", "chronological", "sequence", "timeline"],
            "list_tools": ["tools", "available", "capabilities", "help", "what can"],
            "visual_analysis": ["what", "describe", "see", "happening", "count", "identify"]
        }

        for tool, keywords in patterns.items():
            if any(keyword in content for keyword in keywords):
                return {
                    "primary_tool": tool,
                    "reasoning": f"Pattern match: {tool}",
                    "custom_prompt": user_question,
                    "confidence": "0.7"
                }

        return {
            "primary_tool": "visual_analysis",
            "reasoning": "Default fallback",
            "custom_prompt": user_question,
            "confidence": "0.5"
        }

# MCP Tool Integration
class MCPToolManager:
    """Manager for MCP (Model Context Protocol) tools"""

    def __init__(self):
        self.sessions = {}
        self.available_servers = []
        self.mcp_enabled = MCP_AVAILABLE

    async def initialize_mcp_server(self, server_name: str, command: List[str]) -> bool:
        """Initialize an MCP server connection"""
        if not self.mcp_enabled:
            logger.warning("MCP not available")
            return False

        try:
            server_params = StdioServerParameters(
                command=command,
                env=None
            )

            session = await stdio_client(server_params).__aenter__()
            self.sessions[server_name] = session

            # Initialize the session
            init_result = await session.initialize()
            logger.info(f"MCP server {server_name} initialized: {init_result}")

            # List available tools
            tools_result = await session.list_tools()
            logger.info(f"Available tools from {server_name}: {[tool.name for tool in tools_result.tools]}")

            return True

        except Exception as e:
            logger.error(f"Failed to initialize MCP server {server_name}: {e}")
            return False

    async def call_mcp_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Call an MCP tool"""
        if server_name not in self.sessions:
            raise ValueError(f"MCP server {server_name} not initialized")

        try:
            session = self.sessions[server_name]
            result = await session.call_tool(tool_name, arguments)
            return result
        except Exception as e:
            logger.error(f"MCP tool call failed: {e}")
            raise

# Video Analysis Tools
class VideoAnalysisTools:
    """Collection of video analysis tools"""

    def __init__(self, model_config: ModelConfig, mcp_manager: MCPToolManager):
        self.model_config = model_config
        self.mcp_manager = mcp_manager
        self.processor = None
        self.model = None
        self._initialize_models()
        self._frame_cache = {}

    import re as _re

    def _extract_count(self, text: str) -> str:
        """Return a strict integer or short range from model text."""
        t = text.strip()
        # exact integer
        m = _re.search(r"\b(\d+)\b", t)
        if m:
            return m.group(1)
        # short range like 2-3 or 2–3
        m = _re.search(r"\b(\d+)\s*[-–]\s*(\d+)\b", t)
        if m:
            a, b = int(m.group(1)), int(m.group(2))
            return f"{a}-{b}" if a <= b else f"{b}-{a}"
        # words fallback
        words = {"one":"1","two":"2","three":"3","four":"4","five":"5","six":"6","seven":"7","eight":"8","nine":"9","ten":"10"}
        for w,n in words.items():
            if _re.search(rf"\b{w}\b", t.lower()):
                return n
        return t

    def _initialize_models(self):
        """Initialize VideoLlava models"""
        try:
            logger.info(f"Loading VideoLlava: {self.model_config.videollava_model}")

            self.processor = VideoLlavaProcessor.from_pretrained(self.model_config.videollava_model)
            self.model = VideoLlavaForConditionalGeneration.from_pretrained(
                self.model_config.videollava_model,
                torch_dtype=self.model_config.torch_dtype,
                device_map="auto",
                low_cpu_mem_usage=True
            )

            logger.info("VideoLlava models loaded successfully")

        except Exception as e:
            logger.error(f"Failed to load VideoLlava: {e}")
            raise

    def _get_reference_frames(self, video_path: str) -> List[Image.Image]:
        """Cache keyframes once per video for consistent answers across turns."""
        if video_path in self._frame_cache:
            return self._frame_cache[video_path]
        frames = VideoProcessor.extract_keyframes(video_path, max_frames=self.model_config.max_frames)
        self._frame_cache[video_path] = frames
        return frames

    def _build_chat_prompt(self, question: str, preface: str = "") -> str:
        """
        Video-LLaVA prefers chat-style prompts. Always include exactly one <video>.
        """
        system = "You are a helpful assistant that watches the video and answers concisely and factually."
        pre = (preface.strip() + "\n") if preface else ""
        return f"{system}\nUSER: <video>\n{pre}{question}\nASSISTANT:"

    def _build_prompt_and_frames(self, video_path: str, user_q: str):
        """
        Decide which frames to use and return a brief 'preface' for the prompt.
        Returns (frames: List[PIL.Image], preface: str, mode: str)
        mode in {"count","timeline","default","tech"}
        """
        q = (user_q or "").strip()
        ql = q.lower()

        # Expanded intent detection
        is_timeline = bool(re.search(r"\b(step[- ]?by[- ]?step|timeline|sequence|chronolog|beginning|middle|end|conclude|conclusion)\b", ql))
        is_count    = bool(re.search(r"\b(how many|count|number of)\b", ql))
        is_tech     = bool(re.search(r"\b(duration|fps|frame rate|resolution|codec|bit[- ]?rate|metadata)\b", ql))

        if is_tech:
            return None, "__TECH__", "tech"

        if is_timeline:
            # For timeline, use uniform sampling to preserve order
            frames = VideoProcessor.uniform_sample_frames(
                video_path, num_frames=min(48, self.model_config.max_frames * 2)
            )
            preface = (
                "Provide a concise step-by-step timeline of the video.\n"
                "• Use 5–8 bullet points, one sentence each.\n"
                "• If timing is inferable, prefix with [MM:SS]; otherwise omit.\n"
                "• Be specific about actions; avoid generic descriptions."
            )
            return frames, preface, "timeline"

        # For all other tasks, use the cached reference pack for consistency
        frames = self._get_reference_frames(video_path)

        if is_count:
            preface = (
                "Count the number of distinct people visible in the video frames.\n"
                "Return ONLY a single integer; if uncertain, return a short range like '2-3'.\n"
                "Do NOT add words or explanations."
            )
            return frames, preface, "count"

        preface = (
            "Answer the user's question based ONLY on the video frames.\n"
            "Be concise and specific. If information is not visible, say 'not visible'.\n"
            "Do NOT provide a generic caption or unrelated details."
        )
        return frames, preface, "default"



    def analyze_video_content(self, video_path: str, user_q: str) -> str:
        try:
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

            frames, preface, mode = self._build_prompt_and_frames(video_path, user_q)

            if mode == "tech":
                return self.extract_metadata(video_path)

            if not frames:
                return "Error analyzing video: no frames extracted (unsupported/empty video)."

            prompt = self._build_chat_prompt(question=user_q, preface=preface)

            inputs = self.processor(text=prompt, videos=[frames], return_tensors="pt")
            if torch.cuda.is_available():
                inputs = {k: (v.to(self.model_config.device) if isinstance(v, torch.Tensor) else v)
                          for k, v in inputs.items()}

            gen_kwargs = dict(
                max_new_tokens=max(96, self.model_config.max_new_tokens),
                pad_token_id=self.processor.tokenizer.eos_token_id,
                eos_token_id=self.processor.tokenizer.eos_token_id,
                use_cache=True,
            )

            if mode == "count":
                # Deterministic, short output
                gen_kwargs.update(dict(do_sample=False, temperature=0.0, top_p=1.0, repetition_penalty=1.0, max_new_tokens=8))
            else:
                # Gentle sampling to avoid empty outputs
                gen_kwargs.update(dict(do_sample=True, temperature=0.2, top_p=0.9, repetition_penalty=1.05))

            with torch.no_grad():
                ids = self.model.generate(**inputs, **gen_kwargs)

            # Decode only continuation
            if "input_ids" in inputs:
                gen_only = ids[0, inputs["input_ids"].shape[1]:]
            else:
                gen_only = ids[0]
            text = self.processor.batch_decode(gen_only.unsqueeze(0), skip_special_tokens=True, clean_up_tokenization_spaces=False)[0].strip()

            if mode == "count":
                return self._extract_count(text)
            return text if text else "No answer produced."
        except Exception as e:
            logger.error(f"Video analysis error: {e}")
            return f"Error analyzing video: {str(e)}"


    def extract_metadata(self, video_path: str) -> str:
        """Extract comprehensive video metadata"""
        try:
            metadata = VideoProcessor.get_video_metadata(video_path)

            # Format duration
            duration = metadata['duration_seconds']
            if duration > 0:
                minutes = int(duration // 60)
                seconds = duration % 60
                duration_str = f"{minutes}m {seconds:.1f}s" if minutes > 0 else f"{seconds:.1f}s"
            else:
                duration_str = "Unknown"

            # Create formatted output
            output = f"""
**Video Metadata Analysis**

**Duration**: {duration_str} ({duration:.2f} seconds)
**Frame Count**: {metadata['frames']} frames
**Frame Rate**: {metadata['fps']:.2f} fps
**Resolution**: {metadata['width']}x{metadata['height']}
**Codec**: {metadata['codec']}
**Bit Rate**: {metadata.get('bit_rate', 'Unknown')}
**Pixel Format**: {metadata.get('pixel_format', 'Unknown')}

**Technical Summary**: This is a {duration_str} video with {metadata['frames']} frames at {metadata['fps']:.1f} fps, recorded in {metadata['width']}x{metadata['height']} resolution using {metadata['codec']} codec.
            """.strip()

            return output

        except Exception as e:
            logger.error(f"Metadata extraction error: {e}")
            return f"Error extracting metadata: {str(e)}"

# Agent Nodes
class AgentNodes:
    """agent nodes with better error handling and reasoning"""

    def __init__(self, tools: VideoAnalysisTools, router: LLMRouter, config: AgentConfig):
        self.tools = tools
        self.router = router
        self.config = config

    def route_request(self, state: AgentState) -> str:
        """routing with better decision making"""
        try:
            last_message = state["messages"][-1]

            if not isinstance(last_message, HumanMessage):
                return "respond"

            has_video = bool(state.get("video_path"))
            classification = self.router.classify_request(last_message.content, has_video)

            # Store classification for later use
            state["llm_classification"] = classification

            # Add reasoning trace
            if self.config.enable_reasoning:
                reasoning = f"Router decision: {classification['primary_tool']} - {classification['reasoning']}"
                state.setdefault("reasoning_trace", []).append(reasoning)

            # Map tools to routes
            tool_routes = {
                "visual_analysis": "analyze_video",
                "metadata_extraction": "analyze_video",
                "comprehensive_summary": "analyze_video",
                "anomaly_detection": "analyze_video",
                "step_by_step": "analyze_video",
                "web_search": "analyze_video",
                "list_tools": "list_tools",
                "respond": "respond"
            }

            route = tool_routes.get(classification["primary_tool"], "respond")

            if self.config.debug_mode:
                logger.info(f"Routing decision: {route} (tool: {classification['primary_tool']})")

            return route

        except Exception as e:
            logger.error(f"Routing error: {e}")
            state["error_context"] = {"error": str(e), "stage": "routing"}
            return "respond"

    def analyze_video_node(self, state: AgentState) -> AgentState:
        """video analysis node with tool execution"""
        try:
            video_path = state.get("video_path")
            if not video_path:
                return {
                    **state,
                    "messages": [AIMessage(content="Please upload a video first.")]
                }

            classification = state.get("llm_classification") or {}
            tool_type = classification.get("primary_tool", "visual_analysis")
            custom_prompt = classification.get("custom_prompt", "Describe this video.")

            # Execute appropriate analysis
            if tool_type == "metadata_extraction":
                result = self.tools.extract_metadata(video_path)
            elif tool_type == "comprehensive_summary":
                result = self._comprehensive_analysis(video_path)
            elif tool_type == "anomaly_detection":
                result = self.tools.analyze_video_content(
                    video_path,
                    "Identify any unusual, strange, or unexpected elements in this video."
                )
            elif tool_type == "step_by_step":
                result = self._step_by_step_analysis(video_path)
            else:  # visual_analysis or default
                result = self.tools.analyze_video_content(video_path, custom_prompt)

            # Add reasoning if enabled
            if self.config.enable_reasoning and classification.get("reasoning"):
                result += f"\n\n**Analysis Method**: {classification['reasoning']}"

            return {
                **state,
                "messages": [AIMessage(content=result)],
                "tool_calls": [{"tool": tool_type, "result": "success"}]
            }

        except Exception as e:
            logger.error(f"Video analysis error: {e}")
            error_msg = f"Error during video analysis: {str(e)}"
            return {
                **state,
                "messages": [AIMessage(content=error_msg)],
                "error_context": {"error": str(e), "stage": "video_analysis"}
            }

    def _comprehensive_analysis(self, video_path: str) -> str:
        """Perform comprehensive video analysis with multiple passes"""
        analyses = []

        prompts = [
            "What is the main subject or content of this video?",
            "Describe the visual elements, setting, and environment.",
            "What actions or activities are taking place?",
            "What is the overall mood or atmosphere?",
            "Provide any additional notable observations."
        ]

        for i, prompt in enumerate(prompts, 1):
            try:
                result = self.tools.analyze_video_content(video_path, prompt)
                analyses.append(f"**Analysis {i}**: {result}")
            except Exception as e:
                analyses.append(f"**Analysis {i}**: Error - {str(e)}")

        return "**Comprehensive Video Analysis**\n\n" + "\n\n".join(analyses)

    # def _step_by_step_analysis(self, video_path: str) -> str:
    #     """Perform step-by-step chronological analysis"""
    #     analyses = []

    #     prompts = [
    #         "Describe what happens at the beginning of this video.",
    #         "What occurs in the middle section of the video?",
    #         "How does the video conclude?",
    #         "Summarize the overall sequence of events."
    #     ]

    #     for i, prompt in enumerate(prompts, 1):
    #         try:
    #             result = self.tools.analyze_video_content(video_path, prompt)
    #             analyses.append(f"**Step {i}**: {result}")
    #         except Exception as e:
    #             analyses.append(f"**Step {i}**: Error - {str(e)}")

    #     return "**Step-by-Step Video Analysis**\n\n" + "\n\n".join(analyses)

    def _step_by_step_analysis(self, video_path: str) -> str:
        """Single-pass timeline using uniform sampling for ordered events."""
        q = "Give me a concise step-by-step timeline of what happens in the video."
        return self.analyze_video_content(video_path, q)


    def list_tools_node(self, state: AgentState) -> AgentState:
        """tools listing with capabilities"""
        tools_info = """
**VideoLlava Agent Capabilities**

**Core Analysis Tools:**
- **Visual Analysis**: Describe content, count objects, identify elements
- **Metadata Extraction**: Technical specifications (duration, fps, resolution)
- **Comprehensive Summary**: Multi-angle analysis with multiple perspectives

**Advanced Analysis:**
- **Anomaly Detection**: Identify unusual or unexpected elements
- **Step-by-Step Analysis**: Chronological breakdown of video events
- **Web Search**: External context and information lookup (when enabled)

**System Features:**
- **Intelligent Routing**: LLM-powered tool selection
- **Memory Persistence**: Video remains loaded between questions
- **Error Recovery**: Robust fallback mechanisms
- **Reasoning Traces**: See why each tool was selected

**How to Use:**
Simply upload a video and ask natural questions. The system will automatically select the most appropriate analysis method.

Ready to analyze your video!
        """.strip()

        return {
            **state,
            "messages": [AIMessage(content=tools_info)]
        }

    def respond_node(self, state: AgentState) -> AgentState:
        """response node with context awareness (uses router LLM if available)"""
        last_msg = state["messages"][-1] if state.get("messages") else None
        question = getattr(last_msg, "content", "") if last_msg else ""
        error_context = state.get("error_context")

        # If we have an error, keep your troubleshooting block
        if error_context:
            response = (
                f"I encountered an issue: {error_context['error']}\n\n"
                "**Troubleshooting Steps:**\n"
                "1. Ensure your video file is properly uploaded\n"
                "2. Check that the video format is supported (MP4, AVI, MOV, etc.)\n"
                "3. Try asking a different question about the video\n"
                "4. If issues persist, try re-uploading the video\n\n"
                "**What I can help with:**\n"
                "- Video content analysis and description\n"
                "- Technical metadata extraction\n"
                "- Comprehensive video summaries\n"
                "- Anomaly detection in videos\n"
                "- Step-by-step video breakdowns\n"
                "\nPlease try again with a video upload and your question."
            )
            return {**state, "messages": [AIMessage(content=response)]}

        # Try to answer with the router LLM (Phi-3) using prior context
        answer = None
        try:
            if self.router and self.router.model is not None and self.router.tokenizer is not None:
                # Gather brief context (previous user/assistant turns)
                ctx = []
                for m in state.get("messages", [])[:-1]:
                    c = getattr(m, "content", "")
                    if c:
                        ctx.append(c)
                context_text = "\n".join(ctx)[-2000:]  # keep prompt small

                prompt = (
                    "You are a helpful assistant. Using the conversation context (which may include a "
                    "user-provided scene description), answer the latest question concisely and directly.\n\n"
                    f"Context:\n{context_text}\n\n"
                    f"Question: {question}\nAnswer:"
                )
                toks = self.router.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024, padding=True)
                if torch.cuda.is_available():
                    toks = {k: v.to(self.router.device) for k, v in toks.items()}

                with torch.no_grad():
                    out = self.router.model.generate(
                        **toks,
                        max_new_tokens=180,
                        do_sample=False,
                        pad_token_id=self.router.tokenizer.eos_token_id,
                        eos_token_id=self.router.tokenizer.eos_token_id,
                        use_cache=True,
                    )
                gen = out[0, toks["input_ids"].shape[1]:]
                answer = self.router.tokenizer.decode(gen, skip_special_tokens=True).strip()
        except Exception:
            answer = None

        if not answer:
            # question-aware fallback (better than static help)
            ql = (question or "").lower()
            if any(k in ql for k in ["how many", "count", "number of"]):
                answer = "If you upload the video, I can count precisely. If you describe the scene, I can estimate from your text."
            elif any(k in ql for k in ["how long", "duration", "length"]):
                answer = "I need the video or its metadata to give the exact duration."
            else:
                answer = "Tell me what’s in the scene (or upload the video), and ask a specific question—I’ll answer directly."

        return {**state, "messages": [AIMessage(content=answer)]}

# Main Agent Class
class VideoLlavaAgent:
    """Main VideoLlava agent with MCP integration"""

    def __init__(self, model_config: ModelConfig = None, agent_config: AgentConfig = None):
        self.model_config = model_config or ModelConfig()
        self.agent_config = agent_config or AgentConfig()

        # Initialize components
        logger.info("Initializing VideoLlava Agent...")

        self.mcp_manager = MCPToolManager()
        self.router = LLMRouter(self.model_config)
        self.tools = VideoAnalysisTools(self.model_config, self.mcp_manager)
        self.nodes = AgentNodes(self.tools, self.router, self.agent_config)

        # Create workflow
        self.workflow = self._create_workflow()

        logger.info("VideoLlava Agent initialized successfully")

    def _create_workflow(self) -> StateGraph:
        """Create LangGraph workflow"""
        workflow = StateGraph(AgentState)

        # Add nodes
        workflow.add_node("route", lambda state: state)  # Routing happens in conditional edges
        workflow.add_node("analyze_video", self.nodes.analyze_video_node)
        workflow.add_node("list_tools", self.nodes.list_tools_node)
        workflow.add_node("respond", self.nodes.respond_node)

        # Set up routing
        workflow.add_conditional_edges(
            START,
            self.nodes.route_request,
            {
                "analyze_video": "analyze_video",
                "list_tools": "list_tools",
                "respond": "respond"
            }
        )

        # Add endings
        workflow.add_edge("analyze_video", END)
        workflow.add_edge("list_tools", END)
        workflow.add_edge("respond", END)

        # Add memory if enabled
        if self.agent_config.enable_memory:
            memory = MemorySaver()
            return workflow.compile(checkpointer=memory)
        else:
            return workflow.compile()

    async def initialize_mcp_servers(self, server_configs: List[Dict[str, Any]]):
        """Initialize MCP servers for external tool integration"""
        for config in server_configs:
            success = await self.mcp_manager.initialize_mcp_server(
                config["name"],
                config["command"]
            )
            if success:
                logger.info(f"MCP server {config['name']} initialized")
            else:
                logger.warning(f"Failed to initialize MCP server {config['name']}")

    def process_request(self, video_path: str, message: str, session_id: str = "default") -> Dict[str, Any]:
        """Process a user request with error handling"""
        try:
            # Prepare state
            state = {
                "messages": [HumanMessage(content=message)],
                "video_path": video_path,
                "current_task": message,
                "tool_calls": [],
                "context": {},
                "llm_classification": None,
                "error_context": None,
                "session_id": session_id,
                "reasoning_trace": []
            }

            # Run workflow
            config = RunnableConfig({"thread_id": session_id})
            result = self.workflow.invoke(state, config)

            # Extract response
            response = None
            for msg in reversed(result["messages"]):
                if isinstance(msg, AIMessage):
                    response = msg.content
                    break

            return {
                "response": response or "No response generated",
                "tool_calls": result.get("tool_calls", []),
                "reasoning_trace": result.get("reasoning_trace", []),
                "error_context": result.get("error_context"),
                "classification": result.get("llm_classification")
            }

        except Exception as e:
            logger.error(f"Request processing error: {e}")
            return {
                "response": f"Error processing request: {str(e)}",
                "tool_calls": [],
                "reasoning_trace": [],
                "error_context": {"error": str(e), "stage": "processing"},
                "classification": None
            }


In [4]:
import time
from datetime import datetime

def load_models_with_progress():
    """Load models with detailed progress tracking"""

    print("=== Model Loading Phase ===")
    start_time = time.time()

    # Determine optimal configuration based on available resources
    if gpu_memory >= 20:
        config_name = "High-End"
        max_frames = 32
        router_model = "microsoft/Phi-3-mini-4k-instruct"
        max_tokens = 256
    elif gpu_memory >= 12:
        config_name = "Mid-Range"
        max_frames = 16
        router_model = "microsoft/Phi-3-mini-4k-instruct"
        max_tokens = 200
    else:
        config_name = "Low-Memory"
        max_frames = 8
        router_model = "microsoft/DialoGPT-medium"
        max_tokens = 150

    print(f"🎯 Using {config_name} Configuration")
    print(f"   Max Frames: {max_frames}")
    print(f"   Router Model: {router_model}")
    print(f"   Max Tokens: {max_tokens}")

    # Create configurations
    model_config = ModelConfig(
        videollava_model="LanguageBind/Video-LLaVA-7B-hf",
        router_model=router_model,
        device="cuda" if gpu_available else "cpu",
        torch_dtype=torch.float16 if gpu_available else torch.float32,
        max_frames=max_frames,
        max_new_tokens=max_tokens
    )

    agent_config = AgentConfig(
        enable_reasoning=True,
        enable_memory=True,
        max_tool_calls=3 if gpu_memory >= 16 else 2,
        fallback_enabled=True,
        debug_mode=True
    )

    print(f"⏳ Starting model loading at {datetime.now().strftime('%H:%M:%S')}")

    try:
        # Create agent with progress tracking
        print("🔄 Creating VideoLlava Agent...")
        agent = VideoLlavaAgent(model_config, agent_config)

        load_time = time.time() - start_time
        print(f"✅ Models loaded successfully in {load_time:.1f} seconds")

        # Memory status after loading
        if gpu_available:
            allocated = torch.cuda.memory_allocated(0) / 1024**3
            cached = torch.cuda.memory_reserved(0) / 1024**3
            print(f"📊 GPU Memory - Allocated: {allocated:.1f}GB, Cached: {cached:.1f}GB")

        return agent, model_config, agent_config

    except Exception as e:
        print(f"❌ Error loading models: {e}")

        # Try fallback configuration
        print("🔄 Attempting fallback configuration...")

        fallback_config = ModelConfig(
            videollava_model="LanguageBind/Video-LLaVA-7B-hf",
            router_model="microsoft/DialoGPT-medium",
            max_frames=6,
            max_new_tokens=128
        )

        fallback_agent_config = AgentConfig(
            enable_reasoning=False,
            enable_memory=False,
            max_tool_calls=1,
            debug_mode=False
        )

        try:
            agent = VideoLlavaAgent(fallback_config, fallback_agent_config)
            print("✅ Fallback configuration loaded")
            return agent, fallback_config, fallback_agent_config
        except Exception as fallback_error:
            print(f"❌ Fallback also failed: {fallback_error}")
            raise

# Load the models
agent, model_config, agent_config = load_models_with_progress()

=== Model Loading Phase ===
🎯 Using High-End Configuration
   Max Frames: 32
   Router Model: microsoft/Phi-3-mini-4k-instruct
   Max Tokens: 256
⏳ Starting model loading at 04:39:31
🔄 Creating VideoLlava Agent...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

You have video processor config saved in `preprocessor.json` file which is deprecated. Video processor configs should be saved in their own `video_preprocessor.json` file. You can rename the file or load and save the processor back which renames it automatically. Loading from `preprocessor.json` will be removed in v5.0.


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

✅ Models loaded successfully in 14.0 seconds
📊 GPU Memory - Allocated: 20.8GB, Cached: 21.4GB


In [5]:
# Gradio Interface for VideoLlava Agent
# Optimized for Google Colab deployment with better memory management

import gradio as gr
import torch
import logging
from typing import List, Tuple, Optional, Dict, Any, Union
import json
import time
from datetime import datetime
import os
import gc

# Setup logging for Gradio interface
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VideoLlavaInterface:
    """Gradio interface with memory management and session handling"""

    def __init__(self, agent: 'VideoLlavaAgent'):
        self.agent = agent
        self.current_video_path = None
        self.session_history = {}
        self.performance_stats = {"total_requests": 0, "avg_response_time": 0}

    def process_video_message(
        self,
        video,
        message: str,
        history: Optional[List[Dict[str, str]]] = None,
        session_id: str = "default",
    ) -> Tuple[List[Dict[str, str]], str]:
        start_time = time.time()
        history = history or []

        if not message.strip():
            return history, ""

        # update video path if new upload
        if video and video != self.current_video_path:
            self.current_video_path = video
            logger.info(f"New video uploaded: {os.path.basename(video) if video else 'None'}")
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
                gc.collect()

        # add user message
        history.append({"role": "user", "content": message})

        try:
            result = self.agent.process_request(
                video_path=self.current_video_path,
                message=message,
                session_id=session_id
            )

            response = result["response"]

            # optional metadata
            if result.get("classification"):
                classification = result["classification"]
                confidence = classification.get("confidence", "Unknown")
                tool_used = classification.get("primary_tool", "Unknown")
                metadata = "\n\n**Analysis Details:**\n"
                metadata += f"- **Tool Selected**: {tool_used.replace('_', ' ').title()}\n"
                metadata += f"- **Confidence**: {confidence}\n"
                if result.get("reasoning_trace"):
                    metadata += f"- **Reasoning**: {result['reasoning_trace'][-1]}\n"
                response += metadata

            if self.current_video_path:
                response += f"\n\n**Current Video**: {os.path.basename(self.current_video_path)}"

            # add assistant message
            history.append({"role": "assistant", "content": response})

            # perf stats
            self._update_performance_stats(time.time() - start_time)
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

            return history, ""

        except Exception as e:
            logger.error(f"Error processing message: {e}")
            error_response = f"**Error**: {str(e)}\n\n"
            if self.current_video_path:
                error_response += f"Video loaded: {os.path.basename(self.current_video_path)}\n"
            else:
                error_response += "No video currently loaded. Please upload a video first.\n"
            error_response += "\n**Troubleshooting:**\n- Ensure video file is properly uploaded\n- Check video format (MP4, AVI, MOV supported)\n- Try restarting if memory issues persist\n"

            history.append({"role": "assistant", "content": error_response})
            return history, ""

    def clear_session(self) -> Tuple[List, str]:
        """Clear session and reset video state"""
        logger.info("Clearing session and resetting state")

        self.current_video_path = None

        # Force garbage collection and GPU memory cleanup
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()

        return [], ""

    def get_system_status(self) -> str:
        """Get current system status and performance info"""
        status = "**System Status**\n\n"

        # GPU Information
        if torch.cuda.is_available():
            gpu_name = torch.cuda.get_device_name(0)
            total_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
            allocated_memory = torch.cuda.memory_allocated(0) / 1024**3
            cached_memory = torch.cuda.memory_reserved(0) / 1024**3

            status += f"**GPU**: {gpu_name}\n"
            status += f"**Memory**: {allocated_memory:.1f}GB / {total_memory:.1f}GB allocated\n"
            status += f"**Cached**: {cached_memory:.1f}GB\n"
        else:
            status += "**Device**: CPU only\n"

        # Agent Status
        status += f"\n**Agent Status**:\n"
        status += f"- **LLM Router**: {'Active' if self.agent.router.model else 'Fallback Mode'}\n"
        status += f"- **VideoLlava**: {'Loaded' if self.agent.tools.model else 'Not Loaded'}\n"
        status += f"- **MCP Integration**: {'Available' if self.agent.mcp_manager.mcp_enabled else 'Not Available'}\n"

        # Current Session
        status += f"\n**Current Session**:\n"
        status += f"- **Video Loaded**: {'Yes' if self.current_video_path else 'No'}\n"
        if self.current_video_path:
            status += f"- **Video File**: {os.path.basename(self.current_video_path)}\n"

        # Performance Stats
        status += f"\n**Performance**:\n"
        status += f"- **Total Requests**: {self.performance_stats['total_requests']}\n"
        status += f"- **Avg Response Time**: {self.performance_stats['avg_response_time']:.2f}s\n"

        return status

    def _update_performance_stats(self, response_time: float):
        """Update performance statistics"""
        current_total = self.performance_stats["total_requests"]
        current_avg = self.performance_stats["avg_response_time"]

        new_total = current_total + 1
        new_avg = (current_avg * current_total + response_time) / new_total

        self.performance_stats["total_requests"] = new_total
        self.performance_stats["avg_response_time"] = new_avg

def create_gradio_interface(agent: 'VideoLlavaAgent') -> gr.Blocks:
    """Create comprehensive Gradio interface"""

    interface = VideoLlavaInterface(agent)

    # Custom CSS for better styling
    custom_css = """
    .gradio-container {
        max-width: 1200px !important;
    }
    .video-upload {
        border: 2px dashed #4A90E2;
        border-radius: 10px;
        padding: 20px;
    }
    .status-box {
        background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
        padding: 15px;
        border-radius: 10px;
        color: white;
        margin: 10px 0;
    }
    .tool-info {
        background-color: #f8f9fa;
        padding: 15px;
        border-radius: 8px;
        border-left: 4px solid #4A90E2;
    }
    """

    with gr.Blocks(
        title="VideoLlava Agent",
        theme=gr.themes.Soft(),
        css=custom_css
    ) as demo:

        # Header
        gr.HTML("""
        <div style='text-align: center; margin-bottom: 20px;'>
            <h1 style='color: #4A90E2; margin-bottom: 10px;'>VideoLlava Agent</h1>
            <p style='font-size: 18px; color: #666;'>
                Intelligent Video Analysis with LLM Routing + LangGraph + VideoLlava
            </p>
        </div>
        """)

        # System Status
        with gr.Row():
            with gr.Column(scale=2):
                system_status = gr.HTML(
                    value=interface.get_system_status()
                )

            with gr.Column(scale=1):
                refresh_status_btn = gr.Button("Refresh Status", variant="secondary")
                clear_memory_btn = gr.Button("Clear Memory", variant="secondary")

        gr.HTML("<hr>")

        # Main Interface
        with gr.Row():
            # Left Column - Video Upload and Instructions
            with gr.Column(scale=1):
                video_input = gr.Video(
                    label="Upload Video for Analysis",
                    height=350
                )

                # Instructions
                gr.HTML("""
                <div class="tool-info">
                    <h4>How to Use</h4>
                    <ol>
                        <li><strong>Upload Video</strong>: Select any video file (MP4, AVI, MOV, etc.)</li>
                        <li><strong>Ask Questions</strong>: Use natural language to ask about the video</li>
                        <li><strong>Get Analysis</strong>: The AI will automatically select the best tool</li>
                    </ol>
                </div>
                """)

                # Example Questions
                with gr.Accordion("Example Questions", open=False):
                    gr.HTML("""
                    <div style="padding: 10px;">
                        <p><strong>Content Analysis:</strong></p>
                        <ul>
                            <li>"What's happening in this video?"</li>
                            <li>"How many people are there?"</li>
                            <li>"Describe what you see"</li>
                        </ul>

                        <p><strong>Technical Analysis:</strong></p>
                        <ul>
                            <li>"How long is this video?"</li>
                            <li>"What's the resolution and frame rate?"</li>
                            <li>"Give me technical details"</li>
                        </ul>

                        <p><strong>Advanced Analysis:</strong></p>
                        <ul>
                            <li>"Give me a comprehensive analysis"</li>
                            <li>"Is anything unusual happening?"</li>
                            <li>"Break this down step by step"</li>
                        </ul>
                    </div>
                    """)

                # Performance Monitor
                with gr.Accordion("Performance Monitor", open=False):
                    performance_display = gr.JSON(
                        value=interface.performance_stats,
                        label="Performance Statistics"
                    )

                    perf_timer = gr.Timer(30.0)
                    perf_timer.tick(lambda: interface.performance_stats, None, performance_display)

            # Right Column - Chat Interface
            with gr.Column(scale=2):
                chatbot = gr.Chatbot(
                    label="VideoLlava Agent Chat",
                    height=500,
                    show_copy_button=True,
                    type="messages"
                )

                with gr.Row():
                    msg_input = gr.Textbox(
                        label="Ask about your video",
                        placeholder="Upload a video and ask any question - the AI will select the best analysis method!",
                        lines=2,
                        scale=4
                    )

                with gr.Row():
                    send_btn = gr.Button("Send", variant="primary", scale=1)
                    clear_btn = gr.Button("Clear Chat", variant="secondary", scale=1)

        # Advanced Settings
        with gr.Accordion("Advanced Settings", open=False):
            with gr.Row():
                with gr.Column():
                    session_id_input = gr.Textbox(
                        value="default",
                        label="Session ID",
                        placeholder="default"
                    )

                with gr.Column():
                    debug_mode = gr.Checkbox(
                        value=agent.agent_config.debug_mode,
                        label="Debug Mode"
                    )

                with gr.Column():
                    max_frames = gr.Slider(
                        minimum=8,
                        maximum=64,
                        step=8,
                        value=agent.model_config.max_frames,
                        label="Max Frames for Analysis"
                    )

        # Event Handlers
        def send_message(video, message, history, session_id):
            return interface.process_video_message(video, message, history, session_id)[:2]

        def update_system_status():
            return interface.get_system_status()

        def clear_memory():
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            gc.collect()
            return "Memory cleared successfully"

        # Connect events
        send_btn.click(
            fn=send_message,
            inputs=[video_input, msg_input, chatbot, session_id_input],
            outputs=[chatbot, msg_input],
            show_progress=True
        )

        msg_input.submit(
            fn=send_message,
            inputs=[video_input, msg_input, chatbot, session_id_input],
            outputs=[chatbot, msg_input],
            show_progress=True
        )

        clear_btn.click(
            fn=interface.clear_session,
            outputs=[chatbot, msg_input]
        )

        refresh_status_btn.click(
            fn=update_system_status,
            outputs=system_status
        )

        clear_memory_btn.click(
            fn=clear_memory,
            outputs=gr.Textbox(visible=False)  # Hidden output
        )

        # Auto-refresh performance stats
        demo.load(
            fn=lambda: interface.performance_stats,
            outputs=performance_display
        )


    return demo

# Colab-specific setup and deployment
def setup_for_colab():
    """Setup optimized for Google Colab environment"""

    print("Setting up VideoLlava Agent for Google Colab...")

    # Memory optimization
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        # Set memory fraction to prevent OOM
        torch.cuda.set_per_process_memory_fraction(0.85)

    # Configure for Colab
    os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")  # Avoid tokenizer warnings

    # Model configuration optimized for Colab
    model_config = ModelConfig(
        videollava_model="LanguageBind/Video-LLaVA-7B-hf",
        router_model="microsoft/Phi-3-mini-4k-instruct",
        device="cuda" if torch.cuda.is_available() else "cpu",
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        max_frames=16,  # Reduced for Colab
        max_new_tokens=200  # Reduced for faster generation
    )

    # Agent configuration
    agent_config = AgentConfig(
        enable_reasoning=True,
        enable_memory=True,
        max_tool_calls=2,  # Reduced for Colab
        fallback_enabled=True,
        debug_mode=True
    )

    return model_config, agent_config

def deploy_in_colab():
    """Complete deployment function for Colab"""

    try:
        # Setup configurations
        model_config, agent_config = setup_for_colab()

        print("Creating VideoLlava Agent...")
        agent = VideoLlavaAgent(model_config, agent_config)

        print("Creating Gradio interface...")
        demo = create_gradio_interface(agent)

        print("Launching interface...")
        demo.launch(
            share=True,
            server_port=7860,
            server_name="0.0.0.0",
            show_error=True,
            debug=True,
            quiet=False
        )

        return demo, agent

    except Exception as e:
        print(f"Error during deployment: {e}")

        # Fallback: Try with reduced configuration
        print("Attempting fallback configuration...")

        model_config = ModelConfig(
            videollava_model="LanguageBind/Video-LLaVA-7B-hf",
            router_model="microsoft/DialoGPT-medium",  # Smaller fallback
            max_frames=8,
            max_new_tokens=150
        )

        agent_config = AgentConfig(
            enable_reasoning=False,  # Disable for simplicity
            enable_memory=False,
            max_tool_calls=1
        )

        try:
            agent = VideoLlavaAgent(model_config, agent_config)
            demo = create_gradio_interface(agent)
            demo.launch(share=True, server_port=7860, server_name="0.0.0.0")

            return demo, agent

        except Exception as fallback_error:
            print(f"Fallback also failed: {fallback_error}")
            raise


In [None]:
def create_optimized_interface(agent):
    """Create interface optimized for Colab"""

    print("🎨 Creating Gradio interface...")

    # Import the interface creation function and modify for Colab
    interface = VideoLlavaInterface(agent)

    # Create custom Gradio interface for Colab
    custom_css = """
    .gradio-container {
        max-width: 100% !important;
        font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
    }
    .video-upload {
        border: 3px dashed #4CAF50;
        border-radius: 15px;
        padding: 20px;
        background: linear-gradient(145deg, #f0f8ff, #e6f3ff);
    }
    .status-display {
        background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
        color: white;
        padding: 15px;
        border-radius: 10px;
        box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
    }
    .chat-container {
        border-radius: 10px;
        box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
    }
    """

    with gr.Blocks(
        title="VideoLlava Agent - Colab Edition",
        theme=gr.themes.Soft(primary_hue="blue", secondary_hue="purple"),
        css=custom_css
    ) as demo:

        # Header with system info
        gpu_status = "🟢 GPU Ready" if gpu_available else "🔴 CPU Mode"
        router_status = "🧠 LLM Router Active" if agent.router.model else "⚠️ Pattern Matching"

        gr.HTML(f"""
        <div style='text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
                    color: white; border-radius: 15px; margin-bottom: 20px; box-shadow: 0 4px 15px rgba(0,0,0,0.2);'>
            <h1 style='margin: 0; font-size: 2.5em; font-weight: bold;'>VideoLlava Agent</h1>
            <p style='margin: 10px 0; font-size: 1.2em;'>Intelligent Video Analysis • LangGraph Orchestration • Colab Optimized</p>
            <div style='margin-top: 15px;'>
                <span style='margin: 0 15px; padding: 5px 15px; background: rgba(255,255,255,0.2);
                           border-radius: 20px; font-weight: bold;'>{gpu_status}</span>
                <span style='margin: 0 15px; padding: 5px 15px; background: rgba(255,255,255,0.2);
                           border-radius: 20px; font-weight: bold;'>{router_status}</span>
                <span style='margin: 0 15px; padding: 5px 15px; background: rgba(255,255,255,0.2);
                           border-radius: 20px; font-weight: bold;'>🛠️ 6 Analysis Tools</span>
            </div>
        </div>
        """)

        # Quick Stats Row
        with gr.Row():
            with gr.Column(scale=1):
                gr.HTML(f"""
                <div class="status-display">
                    <h4>⚡ Performance Config</h4>
                    <p><strong>Max Frames:</strong> {model_config.max_frames}</p>
                    <p><strong>Max Tokens:</strong> {model_config.max_new_tokens}</p>
                    <p><strong>Memory Mode:</strong> {'Optimized' if gpu_available else 'Conservation'}</p>
                </div>
                """)

            with gr.Column(scale=1):
                system_monitor = gr.HTML(
                    value=interface.get_system_status(),
                )

                refresh_btn = gr.Button("🔄 Refresh Status")

        # Main Interface
        with gr.Row():
            with gr.Column(scale=1):
                # Video Upload
                video_input = gr.Video(
                    label="📹 Upload Video for AI Analysis",
                    height=350
                )

                # Quick Action Buttons
                gr.HTML("<h4>🚀 Quick Actions</h4>")
                with gr.Row():
                    analyze_btn = gr.Button("🔍 Analyze Content", variant="primary")
                    metadata_btn = gr.Button("📊 Get Metadata", variant="secondary")
                    summary_btn = gr.Button("📝 Full Summary", variant="secondary")


                # Example Questions
                with gr.Accordion("💡 Example Questions", open=True):
                    example_questions = [
                        "What's happening in this video?",
                        "How many people are there?",
                        "How long is this video?",
                        "Give me technical details",
                        "Is anything unusual happening?",
                        "Break this down step by step"
                    ]

                    for question in example_questions:
                        gr.Button(question, variant="secondary")

            with gr.Column(scale=2):
                # Chat Interface
                chatbot = gr.Chatbot(
                  label="🤖 VideoLlava Chat Assistant",
                  height=500,
                  show_copy_button=True,
                  type="messages"
                )

                # Message Input
                with gr.Row():
                    msg_input = gr.Textbox(
                        label="💬 Ask about your video",
                        placeholder="Upload a video and ask any question - I'll automatically select the best analysis method!",
                        lines=2,
                        scale=5
                    )

                # Action Buttons
                with gr.Row():
                    send_btn = gr.Button("📤 Send", variant="primary", scale=2)
                    clear_btn = gr.Button("🗑️ Clear Chat", variant="secondary", scale=1)
                    memory_btn = gr.Button("💾 Clear Memory", variant="secondary", scale=1)

        # Performance Monitor
        with gr.Accordion("📈 Performance Monitor", open=False):
            with gr.Row():
                performance_json = gr.JSON(label="Performance Stats")
                error_log = gr.Textbox(label="System Log", lines=5, max_lines=10)
                # Auto-refresh performance
                perf_timer = gr.Timer(30.0)
                perf_timer.tick(lambda: interface.performance_stats, None, performance_json)

        # Event Handlers
        def send_message(video, message, history):
            return interface.process_video_message(video, message, history)[:2]

        def quick_analyze(video, history):
            if video:
                return send_message(video, "What's happening in this video?", history)
            return history, ""

        def quick_metadata(video, history):
            if video:
                return send_message(video, "Give me the technical details and metadata", history)
            return history, ""

        def quick_summary(video, history):
            if video:
                return send_message(video, "Provide a comprehensive analysis of this video", history)
            return history, ""

        # Connect events
        send_btn.click(send_message, [video_input, msg_input, chatbot], [chatbot, msg_input])
        msg_input.submit(send_message, [video_input, msg_input, chatbot], [chatbot, msg_input])

        analyze_btn.click(quick_analyze, [video_input, chatbot], [chatbot, msg_input])
        metadata_btn.click(quick_metadata, [video_input, chatbot], [chatbot, msg_input])
        summary_btn.click(quick_summary, [video_input, chatbot], [chatbot, msg_input])

        clear_btn.click(interface.clear_session, outputs=[chatbot, msg_input])
        refresh_btn.click(lambda: interface.get_system_status(), outputs=system_monitor)



    return demo

# Create and launch interface
print("🚀 Creating optimized interface...")
demo = create_optimized_interface(agent)

print("🌐 Launching Gradio interface...")
demo.launch(
    share=True,           # Create public link
    debug=True,          # Enable debug mode
    server_name="0.0.0.0",  # Allow external access
    server_port=7860,    # Standard port
    show_error=True,     # Show detailed errors
    inbrowser=True,      # Open in browser automatically
    height=800,          # Set height
    favicon_path=None,   # No custom favicon
    auth=None,           # No authentication
    max_threads=40       # Handle multiple users
)

🚀 Creating optimized interface...
🎨 Creating Gradio interface...
🌐 Launching Gradio interface...
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://759cd6d142d5ab79d6.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


