In [None]:
!pip install pytube moviepy opencv-python-headless pydub transformers sentence-transformers langchain faiss-cpu langchain-community torch numpy openai-whisper
!pip install -U langchain-community
!pip install langchain-huggingface
!pip install gradio
!pip install SpeechRecognition
!pip install gtts
!pip install langchain-google-genai

In [None]:
# ===================================================================
# ENHANCED MULTI-AGENT SYSTEM WITH PROVEN ACCURACY PATTERNS
# Combines your successful architecture with multi-agent intelligence
# ===================================================================

import os
import base64
import tempfile
import pickle
import asyncio
import logging
import json
import uuid
import subprocess
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from datetime import datetime

import cv2
import numpy as np
import whisper
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.schema import Document
from langchain_google_genai import GoogleGenerativeAI
from gtts import gTTS
import gradio as gr

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ===================================================================
# ENHANCED DATA INGESTION - USING YOUR PROVEN PATTERN
# ===================================================================

class ProvenPatternIngestion:
    """Data ingestion using your successful architecture patterns"""
    
    def __init__(self):
        # Use your exact configuration
        self.embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
        self.whisper_model = whisper.load_model("base")
        
        # Create directories like your approach
        self.setup_directories()
        logger.info("ProvenPatternIngestion initialized")
    
    def setup_directories(self):
        """Setup directories exactly like your approach"""
        for dir_name in ["uploads", "frames", "audio", "faiss_index"]:
            os.makedirs(dir_name, exist_ok=True)
    
    def save_uploaded_video(self, video_bytes):
        """Your exact video saving approach"""
        try:
            video_path = os.path.join("uploads", f"{uuid.uuid4().hex}.mp4")
            with open(video_path, "wb") as f:
                f.write(video_bytes)
            return video_path
        except Exception as e:
            logger.error(f"Video save failed: {str(e)}")
            return None
    
    def extract_audio_improved(self, video_path):
        """Enhanced audio extraction with ffmpeg fallback"""
        try:
            # Try your original approach first
            from moviepy.editor import VideoFileClip
            with VideoFileClip(video_path) as clip:
                if clip.audio is None:
                    return None
                
                audio_path = os.path.join("audio", f"{uuid.uuid4().hex}_audio.wav")
                clip.audio.write_audiofile(audio_path, logger=None, verbose=False)
                return audio_path
                
        except Exception as e:
            logger.warning(f"MoviePy extraction failed: {e}, trying ffmpeg...")
            
            # Fallback to ffmpeg
            try:
                audio_path = os.path.join("audio", f"{uuid.uuid4().hex}_audio.wav")
                cmd = [
                    'ffmpeg', '-i', video_path,
                    '-vn', '-acodec', 'pcm_s16le', 
                    '-ar', '16000', '-ac', '1',
                    '-y', audio_path
                ]
                subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
                return audio_path
            except Exception as e2:
                logger.error(f"FFmpeg extraction also failed: {e2}")
                return None
    
    def process_video_with_proven_pattern(self, video_bytes):
        """Process video using your proven successful pattern"""
        
        if not video_bytes:
            return "Please upload a file first."
        
        try:
            # Step 1: Save video (your approach)
            video_path = self.save_uploaded_video(video_bytes)
            if not video_path:
                return "Failed to save video"
            
            # Step 2: Extract audio (enhanced)
            audio_path = self.extract_audio_improved(video_path)
            if not audio_path:
                return "Audio extraction failed"
            
            # Step 3: Transcribe audio (your approach)
            try:
                transcription_result = self.whisper_model.transcribe(audio_path)
                transcription = transcription_result["text"].strip()
                
                # Store additional metadata for agents
                transcription_metadata = {
                    "full_result": transcription_result,
                    "confidence": np.mean([seg.get("avg_logprob", -1) for seg in transcription_result.get("segments", [])]),
                    "language": transcription_result.get("language", "en")
                }
            except Exception as e:
                logger.error(f"Transcription failed: {e}")
                return "Transcription failed"
            
            # Step 4: Extract frames (your approach with enhancements)
            try:
                cap = cv2.VideoCapture(video_path)
                fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
                interval = fps * 2  # 2 seconds interval like your approach
                frame_paths = []
                frame_count = 0
                
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                    if frame_count % interval == 0:
                        frame_path = os.path.join("frames", f"frame_{uuid.uuid4().hex}.jpg")
                        cv2.imwrite(frame_path, frame)
                        frame_paths.append(frame_path)
                    frame_count += 1
                
                cap.release()
                
            except Exception as e:
                logger.error(f"Frame extraction failed: {e}")
                return "Frame extraction failed"
            
            # Step 5: Create documents using your EXACT pattern but with agent enhancements
            documents = []
            
            # Primary transcript document (your successful approach)
            transcript_doc = Document(
                page_content=transcription,
                metadata={
                    "type": "transcript", 
                    "audio_path": audio_path,
                    "confidence": transcription_metadata["confidence"],
                    "language": transcription_metadata["language"],
                    "word_count": len(transcription.split()),
                    "content_priority": "high"  # For agent prioritization
                }
            )
            documents.append(transcript_doc)
            
            # Frame documents (your approach with enhancements)
            for i, fp in enumerate(frame_paths):
                # Store frame as base64 for agents
                frame = cv2.imread(fp)
                _, buffer = cv2.imencode('.jpg', frame)
                frame_b64 = base64.b64encode(buffer).decode('utf-8')
                
                frame_doc = Document(
                    page_content=f"Visual content from frame {os.path.basename(fp)}",
                    metadata={
                        "type": "frame",
                        "path": fp,
                        "frame_b64": frame_b64,
                        "timestamp": i * 2,  # Approximate timestamp
                        "frame_index": i,
                        "content_priority": "medium"
                    }
                )
                documents.append(frame_doc)
            
            # Step 6: Create vector store (your exact approach)
            try:
                vector_db = FAISS.from_documents(documents, self.embedding_model)
                vector_db.save_local("faiss_index")
                
                # Save additional metadata for agents
                processing_metadata = {
                    "video_path": video_path,
                    "audio_path": audio_path,
                    "transcription": transcription,
                    "transcription_metadata": transcription_metadata,
                    "frame_count": len(frame_paths),
                    "processing_timestamp": datetime.now().isoformat(),
                    "total_documents": len(documents)
                }
                
                with open("faiss_index/processing_metadata.json", "w") as f:
                    json.dump(processing_metadata, f, indent=2)
                
            except Exception as e:
                logger.error(f"FAISS index creation failed: {e}")
                return "Index creation failed"
            
            return f"Processing completed successfully! Transcription: '{transcription[:100]}...' | Frames: {len(frame_paths)} | Documents: {len(documents)}"
            
        except Exception as e:
            logger.error(f"Critical processing failed: {e}")
            return f"Critical error: {str(e)}"

# ===================================================================
# ENHANCED RETRIEVAL AGENT - USING YOUR SUCCESSFUL PATTERNS
# ===================================================================

class EnhancedRetrievalAgent:
    """Retrieval agent that uses your proven context building pattern"""
    
    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.agent_id = "enhanced_retrieval_agent"
        logger.info("EnhancedRetrievalAgent initialized")
    
    def retrieve_with_proven_pattern(self, vector_store: FAISS, query: str, k: int = 5) -> Tuple[List, str]:
        """Retrieve using your proven approach but with intelligent enhancements"""
        try:
            # Your exact retrieval approach
            docs = vector_store.similarity_search(query, k=k)
            
            if not docs:
                return [], "No relevant information found for your query."
            
            # Build context using your EXACT successful pattern
            context = "Retrieved Information:\n"
            frames = []
            
            for doc in docs:
                if doc.metadata["type"] == "transcript":
                    # Your exact pattern for transcript
                    context += f"- Transcript snippet: '{doc.page_content}'\n"
                elif doc.metadata["type"] == "frame":
                    # Your exact pattern for frames
                    context += f"- A visual frame was identified related to the query.\n"
                    
                    # Collect frame for display (enhancement)
                    if "frame_b64" in doc.metadata:
                        try:
                            img_bytes = base64.b64decode(doc.metadata["frame_b64"])
                            frame_np = np.frombuffer(img_bytes, np.uint8)
                            frame_image = cv2.imdecode(frame_np, cv2.IMREAD_COLOR)
                            frame_image = cv2.cvtColor(frame_image, cv2.COLOR_BGR2RGB)
                            frames.append(frame_image)
                        except Exception as e:
                            logger.warning(f"Frame processing failed: {e}")
            
            return {
                "context": context,
                "retrieved_docs": docs,
                "frames": frames[:3],  # Limit to 3 frames like your approach
                "success": True
            }, None
            
        except Exception as e:
            logger.error(f"Retrieval failed: {e}")
            return [], f"Failed to retrieve documents: {str(e)}"

# ===================================================================
# ENHANCED ANSWER AGENT - USING YOUR PROVEN LLM PATTERN
# ===================================================================

class EnhancedAnswerAgent:
    """Answer agent using your proven LLM interaction pattern"""
    
    def __init__(self, api_key: str):
        # Use your exact LLM configuration but with current model
        self.llm = GoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=api_key)
        self.agent_id = "enhanced_answer_agent"
        logger.info("EnhancedAnswerAgent initialized")
    
    async def generate_answer_with_proven_pattern(self, query: str, retrieval_result: Dict) -> Dict:
        """Generate answer using your exact successful prompt pattern"""
        
        try:
            context = retrieval_result["context"]
            
            # Your EXACT prompt pattern that works
            prompt = f"Based on the following retrieved information from a video, provide a concise and helpful answer to the user's query.\n\nUser Query: \"{query}\"\n\n{context}\n\nSynthesized Answer:"
            
            # Generate response
            logging.info("Invoking LLM to generate summary...")
            generated_text = self.llm.invoke(prompt)
            
            if not generated_text:
                raise ValueError("LLM returned an empty response.")
            
            logging.info(f"LLM generated text: {generated_text}")
            
            return {
                "answer": generated_text,
                "success": True,
                "context_used": context,
                "source_count": len(retrieval_result["retrieved_docs"])
            }
            
        except Exception as e:
            logger.error(f"Answer generation failed: {e}")
            return {
                "answer": f"LLM failed to process retrieved results: {str(e)}",
                "success": False,
                "context_used": "",
                "source_count": 0
            }

# ===================================================================
# ENHANCED AUDIO AGENT - USING YOUR PROVEN TTS PATTERN
# ===================================================================

class EnhancedAudioAgent:
    """Audio agent using your proven TTS pattern with enhancements"""
    
    def __init__(self):
        self.agent_id = "enhanced_audio_agent"
        logger.info("EnhancedAudioAgent initialized")
    
    def generate_audio_with_proven_pattern(self, text: str) -> Tuple[str, Optional[str]]:
        """Generate audio using your exact successful TTS pattern"""
        
        try:
            logging.info("Generating audio file with gTTS...")
            
            # Your exact TTS approach
            tts = gTTS(text=text, lang='en')
            audio_filename = f"response_{uuid.uuid4().hex}.mp3"
            tts.save(audio_filename)
            
            logging.info(f"Audio file saved as {audio_filename}")
            return audio_filename, None
            
        except Exception as e:
            error_message = f"Failed to generate audio file: {str(e)}"
            logging.error(error_message)
            return None, error_message

# ===================================================================
# ENHANCED ORCHESTRATION - COMBINING YOUR PATTERNS WITH AGENTS
# ===================================================================

class EnhancedOrchestrationAgent:
    """Orchestration that combines your successful patterns with multi-agent intelligence"""
    
    def __init__(self, api_key: str):
        # Initialize with your proven embedding approach
        self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
        
        # Initialize enhanced agents
        self.retrieval_agent = EnhancedRetrievalAgent(self.embeddings)
        self.answer_agent = EnhancedAnswerAgent(api_key)
        self.audio_agent = EnhancedAudioAgent()
        
        # System state
        self.vector_store = None
        self.processing_metadata = None
        
        logger.info("EnhancedOrchestrationAgent initialized")
    
    def load_proven_pattern_data(self) -> Tuple[bool, str]:
        """Load data using your proven FAISS loading pattern"""
        try:
            # Your exact loading approach
            self.vector_store = FAISS.load_local("faiss_index", self.embeddings, allow_dangerous_deserialization=True)
            
            # Load additional metadata for enhanced agents
            metadata_path = "faiss_index/processing_metadata.json"
            if os.path.exists(metadata_path):
                with open(metadata_path, "r") as f:
                    self.processing_metadata = json.load(f)
            
            return True, "Data loaded successfully using proven pattern"
            
        except Exception as e:
            logger.error(f"Failed to load data: {e}")
            return False, f"Failed to load data: {str(e)}"
    
    async def query_with_proven_pattern(self, query: str) -> Dict:
        """Query processing using your proven successful pattern"""
        
        if not self.vector_store:
            return {
                "frames": [],
                "answer": "No data loaded. Please process a video first.",
                "audio_path": None,
                "success": False
            }
        
        try:
            # Step 1: Retrieval using your proven pattern
            retrieval_result, error = self.retrieval_agent.retrieve_with_proven_pattern(
                self.vector_store, query
            )
            
            if error:
                return {
                    "frames": [],
                    "answer": f"Error during retrieval: {error}",
                    "audio_path": None,
                    "success": False
                }
            
            # Step 2: Answer generation using your proven LLM pattern
            answer_result = await self.answer_agent.generate_answer_with_proven_pattern(
                query, retrieval_result
            )
            
            # Step 3: Audio generation using your proven TTS pattern
            audio_path, audio_error = self.audio_agent.generate_audio_with_proven_pattern(
                answer_result["answer"]
            )
            
            final_answer = answer_result["answer"]
            if audio_error:
                final_answer += f"\n\n[WARNING: {audio_error}]"
            
            return {
                "frames": retrieval_result["frames"],
                "answer": final_answer,
                "audio_path": audio_path,
                "success": True,
                "context_used": answer_result.get("context_used", ""),
                "source_count": answer_result.get("source_count", 0)
            }
            
        except Exception as e:
            logger.error(f"Query processing failed: {e}")
            return {
                "frames": [],
                "answer": f"Query processing failed: {str(e)}",
                "audio_path": None,
                "success": False
            }

# ===================================================================
# GRADIO INTERFACE - COMBINING YOUR SUCCESSFUL UI WITH ENHANCEMENTS
# ===================================================================

def create_enhanced_interface():
    """Create interface that combines your successful patterns with multi-agent enhancements"""
    
    # Initialize components
    ingestion = ProvenPatternIngestion()
    orchestrator = None
    
    def setup_system(api_key):
        nonlocal orchestrator
        try:
            if not api_key:
                return "Please enter your API key"
            orchestrator = EnhancedOrchestrationAgent(api_key)
            success, message = orchestrator.load_proven_pattern_data()
            return message
        except Exception as e:
            return f"Setup failed: {str(e)}"
    
    def process_video_interface(video_file):
        if not video_file:
            return "Please upload a video file"
        
        with open(video_file, "rb") as f:
            video_bytes = f.read()
        
        result = ingestion.process_video_with_proven_pattern(video_bytes)
        return result
    
    async def query_interface(query):
        if not orchestrator:
            return [], "Please setup the system first", None
        
        result = await orchestrator.query_with_proven_pattern(query)
        return result["frames"], result["answer"], result["audio_path"]
    
    with gr.Blocks(title="Enhanced MAVIS with Proven Patterns") as app:
        gr.Markdown("# Enhanced Multi-Agent System with Proven Accuracy Patterns")
        gr.Markdown("Combines your successful architecture with multi-agent intelligence")
        
        # Setup section
        with gr.Row():
            api_key_input = gr.Textbox(label="Google API Key", type="password")
            setup_btn = gr.Button("Setup System", variant="primary")
            setup_status = gr.Textbox(label="Setup Status")
        
        # Processing section (your proven UI pattern)
        with gr.Row():
            video_input = gr.File(label="Upload Video")
            process_btn = gr.Button("Process Video", variant="primary")
            process_status = gr.Textbox(label="Processing Status", lines=3)
        
        # Query section (your proven UI pattern)
        with gr.Row():
            query_input = gr.Textbox(label="Enter your query about the video")
            query_btn = gr.Button("Get Answer", variant="primary")
        
        # Results section (your proven UI pattern)
        with gr.Row():
            result_frames = gr.Gallery(label="Relevant Frames")
            
        with gr.Row():
            result_answer = gr.Textbox(label="LLM Generated Summary", lines=6)
            
        with gr.Row():
            result_audio = gr.Audio(label="Synthesized Audio Response")
        
        # Event handlers
        setup_btn.click(setup_system, inputs=[api_key_input], outputs=[setup_status])
        process_btn.click(process_video_interface, inputs=[video_input], outputs=[process_status])
        query_btn.click(query_interface, inputs=[query_input], outputs=[result_frames, result_answer, result_audio])
    
    return app

if __name__ == "__main__":
    app = create_enhanced_interface()
    app.launch(debug=True, share=True)