In [None]:
%pip install videodb
%pip install llama-index
%pip install llama-index-retrievers-videodb

In [None]:
import os
from typing import List, Dict, Optional, Union
from dataclasses import dataclass
from videodb import connect, SceneExtractionType, SearchType, IndexType
from llama_index.core import get_response_synthesizer
from llama_index.retrievers.videodb import VideoDBRetriever
from llama_index.llms.openai import OpenAI
from videodb.timeline import Timeline
from videodb.asset import VideoAsset

@dataclass
class SceneInfo:
    video_id: str  # Added video_id to track which video the scene belongs to
    start_time: float
    end_time: float
    description: str
    emotions: List[str]
    characters: List[str]
    dialogue: str

class MovieSceneAnalyzer:
    def __init__(self, videodb_api_key: str, openai_api_key: str):
        """Initialize the Movie Scene Analyzer with required API keys."""
        os.environ["VIDEO_DB_API_KEY"] = videodb_api_key
        os.environ["OPENAI_API_KEY"] = openai_api_key
        self.conn = connect()
        self.collection = self.conn.create_collection(
            name="Movie Scene Analysis",
            description="Analysis of movie and TV scenes"
        )
        self.llm = OpenAI(model="gpt-4o-mini")

    def process_video(self, video) -> dict:
        """Process a video by indexing speech and scenes."""
        try:
            # Index spoken content
            print("Indexing spoken content...")
            video.index_spoken_words()

            # Index visual scenes
            print("Indexing visual content...")
            scene_index_id = video.index_scenes(
                extraction_type=SceneExtractionType.shot_based,
                extraction_config={"frame_count": 5},
                prompt="Describe the scene in detail including: setting, characters, actions, emotions, and cinematography"
            )

            return {"success": True, "scene_index_id": scene_index_id}
        except Exception as e:
            print(f"Error processing video: {str(e)}")
            return {"success": False, "error": str(e)}

    def add_video(self, url: str, title: str) -> dict:
        """Add a new video to the collection and process it."""
        try:
            # Upload video
            print(f"Uploading video: {title}")
            video = self.collection.upload(url=url)

            # Process the video
            process_result = self.process_video(video)

            if process_result["success"]:
                return {
                    "success": True,
                    "video_id": video.id,
                    "scene_index_id": process_result["scene_index_id"],
                    "title": title
                }
            else:
                return {
                    "success": False,
                    "error": process_result["error"]
                }

        except Exception as e:
            print(f"Error adding video: {str(e)}")
            return {"success": False, "error": str(e)}

    def search_scenes(self, query: str, video_id: Optional[str] = None) -> List[SceneInfo]:
        """Search for scenes matching the query."""
        try:
            # Configure retrievers
            spoken_retriever = VideoDBRetriever(
                collection=self.collection.id,
                video=video_id,
                search_type=SearchType.semantic,
                index_type=IndexType.spoken_word,
                score_threshold=0.2,
            )

            scene_retriever = VideoDBRetriever(
                collection=self.collection.id,
                video=video_id,
                search_type=SearchType.semantic,
                index_type=IndexType.scene,
                score_threshold=0.2,
            )

            # Get relevant nodes
            nodes = spoken_retriever.retrieve(query) + scene_retriever.retrieve(query)

            # Extract scene information
            scenes = []
            for node in nodes:
                scene = SceneInfo(
                    video_id=node.metadata["video_id"],
                    start_time=node.metadata["start"],
                    end_time=node.metadata["end"],
                    description=node.text,
                    emotions=self._extract_emotions(node.text),
                    characters=self._extract_characters(node.text),
                    dialogue=node.text if node.metadata.get("type") == "spoken" else ""
                )
                scenes.append(scene)

            return self._merge_overlapping_scenes(scenes)

        except Exception as e:
            print(f"Error searching scenes: {str(e)}")
            return []

    def create_character_compilation(self,
                                  character_name: str,
                                  video_id: Optional[str] = None) -> Union[str, None]:
        """Create a compilation of scenes featuring a specific character."""
        try:
            # Search for character scenes
            scenes = self.search_scenes(f"scenes with {character_name}", video_id)

            if not scenes:
                print(f"No scenes found for character: {character_name}")
                return None

            # Create timeline
            timeline = Timeline(self.conn)

            # Add scenes to timeline
            for scene in scenes:
                try:
                    asset = VideoAsset(
                        asset_id=scene.video_id,
                        start=scene.start_time,
                        end=scene.end_time
                    )
                    timeline.add_inline(asset)
                except Exception as e:
                    print(f"Error adding scene to timeline: {str(e)}")
                    continue

            # Generate stream
            try:
                return timeline.generate_stream()
            except Exception as e:
                print(f"Error generating stream: {str(e)}")
                return None

        except Exception as e:
            print(f"Error creating character compilation: {str(e)}")
            return None

    def _extract_emotions(self, text: str) -> List[str]:
        """Extract emotional content from scene description."""
        try:
            prompt = "Extract emotions from: " + text
            response = self.llm.complete(prompt)
            return [emotion.strip() for emotion in response.text.split(",")]
        except Exception as e:
            print(f"Error extracting emotions: {str(e)}")
            return []

    def _extract_characters(self, text: str) -> List[str]:
        """Extract character names from scene description."""
        try:
            prompt = "Extract character names from: " + text
            response = self.llm.complete(prompt)
            return [name.strip() for name in response.text.split(",")]
        except Exception as e:
            print(f"Error extracting characters: {str(e)}")
            return []

    def _merge_overlapping_scenes(self, scenes: List[SceneInfo]) -> List[SceneInfo]:
        """Merge overlapping scene segments."""
        if not scenes:
            return []

        # Sort scenes by start time
        sorted_scenes = sorted(scenes, key=lambda x: x.start_time)
        merged = [sorted_scenes[0]]

        for current in sorted_scenes[1:]:
            previous = merged[-1]

            # Only merge scenes from the same video
            if (current.video_id == previous.video_id and
                current.start_time <= previous.end_time):
                # Merge scenes
                previous.end_time = max(previous.end_time, current.end_time)
                previous.description += f"\n{current.description}"
                previous.emotions = list(set(previous.emotions + current.emotions))
                previous.characters = list(set(previous.characters + current.characters))
                previous.dialogue += f"\n{current.dialogue}"
            else:
                merged.append(current)

        return merged

# Example usage
def main():
    try:
        # Initialize the analyzer
        analyzer = MovieSceneAnalyzer(
            videodb_api_key="your_api_key_here",
            openai_api_key="your_api_key_here"
        )

        # Add and process a video
        video_info = analyzer.add_video(
            url="https://www.youtube.com/watch?v=bTN3q_NjuWs",  # Example video URL
            title="Test Movie"
        )

        if not video_info["success"]:
            print(f"Failed to add video: {video_info.get('error')}")
            return

        print("Video added successfully!")
        print(f"Video ID: {video_info['video_id']}")

        # Search for scenes
        scenes = analyzer.search_scenes(
            "dramatic scenes with dialogue",
            video_info["video_id"]
        )

        if scenes:
            print(f"\nFound {len(scenes)} scenes:")
            for i, scene in enumerate(scenes, 1):
                print(f"\nScene {i}:")
                print(f"Time: {scene.start_time:.2f}s - {scene.end_time:.2f}s")
                print(f"Description: {scene.description}")
                print(f"Characters: {', '.join(scene.characters)}")
                print(f"Emotions: {', '.join(scene.emotions)}")

            # Try to create a character compilation
            compilation = analyzer.create_character_compilation(
                scenes[0].characters[0] if scenes[0].characters else "main character",
                video_info["video_id"]
            )

            if compilation:
                print(f"\nCharacter compilation created: {compilation}")
            else:
                print("\nFailed to create character compilation")
        else:
            print("\nNo scenes found matching the criteria")

    except Exception as e:
        print(f"Error in main: {str(e)}")

if __name__ == "__main__":
    main()