In [None]:
import cv2
import numpy as np
from pathlib import Path
import json
import asyncio
from typing import List, Dict, Any, Tuple
import logging
from dataclasses import dataclass
from datetime import datetime
import hashlib

# Core ML libraries
import torch
import torchvision.transforms as transforms
from transformers import BlipProcessor, BlipForConditionalGeneration
from transformers import CLIPProcessor, CLIPModel
import whisper
from ultralytics import YOLO
import mediapipe as mp
import pytesseract
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
import qdrant_client
from qdrant_client.models import Distance, VectorParams, PointStruct
from google.cloud import storage

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class VideoMetadata:
    video_id: str
    filename: str
    duration: float
    fps: float
    resolution: Tuple[int, int]
    location: str = ""
    topic: str = ""
    cluster_id: int = -1

@dataclass
class KeyframeData:
    frame_id: str
    video_id: str
    timestamp: float
    image_path: str
    caption: str
    objects: List[Dict]
    poses: List[Dict]
    actions: List[str]
    ocr_text: str
    scene_embedding: np.ndarray
    text_embedding: np.ndarray
    pose_embedding: np.ndarray

class GoogleStorageManager:
    def __init__(self, bucket_name: str, credentials_path: str = None):
        self.bucket_name = bucket_name
        self.client = storage.Client.from_service_account_json(credentials_path) if credentials_path else storage.Client()
        self.bucket = self.client.bucket(bucket_name)
    
    def upload_file(self, local_path: str, remote_path: str):
        """Upload file to Google Storage"""
        blob = self.bucket.blob(remote_path)
        blob.upload_from_filename(local_path)
        return f"gs://{self.bucket_name}/{remote_path}"
    
    def download_file(self, remote_path: str, local_path: str):
        """Download file from Google Storage"""
        blob = self.bucket.blob(remote_path)
        blob.download_to_filename(local_path)
    
    def list_files(self, prefix: str = "") -> List[str]:
        """List files with given prefix"""
        blobs = self.bucket.list_blobs(prefix=prefix)
        return [blob.name for blob in blobs]

class MultiModalAnalyzer:
    def __init__(self, device: str = "cuda" if torch.cuda.is_available() else "cpu"):
        self.device = device
        self.load_models()
    
    def load_models(self):
        """Load all required models"""
        logger.info("Loading models...")
        
        # Image captioning
        self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        self.blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(self.device)
        
        # CLIP for embeddings
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(self.device)
        
        # Object detection
        self.yolo_model = YOLO('yolov8n.pt')
        
        # Pose estimation
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            static_image_mode=True,
            model_complexity=2,
            enable_segmentation=True,
            min_detection_confidence=0.5
        )
        
        # Speech recognition
        self.whisper_model = whisper.load_model("base")
        
        logger.info("All models loaded successfully!")
    
    def extract_keyframes(self, video_path: str, interval: int = 30) -> List[Tuple[float, np.ndarray]]:
        """Extract keyframes at regular intervals"""
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        keyframes = []
        
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            if frame_count % (fps * interval) == 0:  # Every 'interval' seconds
                timestamp = frame_count / fps
                keyframes.append((timestamp, frame))
            
            frame_count += 1
        
        cap.release()
        return keyframes
    
    def generate_caption(self, image: np.ndarray) -> str:
        """Generate image caption using BLIP"""
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        inputs = self.blip_processor(image_rgb, return_tensors="pt").to(self.device)
        
        with torch.no_grad():
            out = self.blip_model.generate(**inputs, max_length=50, num_beams=5)
            caption = self.blip_processor.decode(out[0], skip_special_tokens=True)
        
        return caption
    
    def detect_objects(self, image: np.ndarray) -> List[Dict]:
        """Detect objects using YOLO"""
        results = self.yolo_model(image)
        objects = []
        
        for r in results:
            boxes = r.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                    conf = box.conf[0].cpu().numpy()
                    cls = int(box.cls[0].cpu().numpy())
                    
                    objects.append({
                        'class': self.yolo_model.names[cls],
                        'confidence': float(conf),
                        'bbox': [float(x1), float(y1), float(x2), float(y2)],
                        'center': [float((x1+x2)/2), float((y1+y2)/2)]
                    })
        
        return objects
    
    def extract_poses(self, image: np.ndarray) -> List[Dict]:
        """Extract human poses using MediaPipe"""
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = self.pose.process(image_rgb)
        
        poses = []
        if results.pose_landmarks:
            landmarks = []
            for landmark in results.pose_landmarks.landmark:
                landmarks.append({
                    'x': landmark.x,
                    'y': landmark.y,
                    'z': landmark.z,
                    'visibility': landmark.visibility
                })
            poses.append({'landmarks': landmarks})
        
        return poses
    
    def extract_ocr_text(self, image: np.ndarray) -> str:
        """Extract text from image using OCR"""
        try:
            text = pytesseract.image_to_string(image)
            return text.strip()
        except:
            return ""
    
    def generate_embeddings(self, image: np.ndarray, text: str) -> Tuple[np.ndarray, np.ndarray]:
        """Generate CLIP embeddings for image and text"""
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Process inputs
        inputs = self.clip_processor(
            text=[text] if text else [""],
            images=image_rgb,
            return_tensors="pt",
            padding=True
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.clip_model(**inputs)
            image_embedding = outputs.image_embeds.cpu().numpy()[0]
            text_embedding = outputs.text_embeds.cpu().numpy()[0]
        
        return image_embedding, text_embedding
    
    def extract_audio_text(self, video_path: str) -> str:
        """Extract and transcribe audio from video"""
        try:
            result = self.whisper_model.transcribe(video_path)
            return result["text"]
        except:
            return ""

class VideoClustering:
    def __init__(self, n_clusters: int = 10):
        self.n_clusters = n_clusters
        self.kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        self.tfidf = TfidfVectorizer(max_features=1000, stop_words='english')
    
    def cluster_videos(self, video_data: List[Dict]) -> Dict[str, int]:
        """Cluster videos based on content features"""
        # Extract features for clustering
        captions = []
        embeddings = []
        
        for video in video_data:
            # Combine all captions from keyframes
            all_captions = " ".join([kf['caption'] for kf in video['keyframes']])
            captions.append(all_captions)
            
            # Average embeddings from all keyframes
            if video['keyframes']:
                avg_embedding = np.mean([kf['scene_embedding'] for kf in video['keyframes']], axis=0)
                embeddings.append(avg_embedding)
            else:
                embeddings.append(np.zeros(512))  # Default CLIP embedding size
        
        # TF-IDF features from captions
        tfidf_features = self.tfidf.fit_transform(captions).toarray()
        
        # Combine TF-IDF and visual embeddings
        embeddings = np.array(embeddings)
        combined_features = np.hstack([tfidf_features, embeddings])
        
        # Perform clustering
        clusters = self.kmeans.fit_predict(combined_features)
        
        # Create mapping
        cluster_mapping = {}
        for i, video in enumerate(video_data):
            cluster_mapping[video['video_id']] = int(clusters[i])
        
        return cluster_mapping
    
    def get_cluster_topics(self, video_data: List[Dict], cluster_mapping: Dict[str, int]) -> Dict[int, str]:
        """Extract topic keywords for each cluster"""
        cluster_topics = {}
        
        for cluster_id in range(self.n_clusters):
            cluster_captions = []
            for video in video_data:
                if cluster_mapping[video['video_id']] == cluster_id:
                    all_captions = " ".join([kf['caption'] for kf in video['keyframes']])
                    cluster_captions.append(all_captions)
            
            if cluster_captions:
                # Get top terms for this cluster
                cluster_text = " ".join(cluster_captions)
                tfidf_matrix = self.tfidf.transform([cluster_text])
                feature_names = self.tfidf.get_feature_names_out()
                scores = tfidf_matrix.toarray()[0]
                
                # Get top 5 terms
                top_indices = np.argsort(scores)[-5:][::-1]
                top_terms = [feature_names[i] for i in top_indices if scores[i] > 0]
                cluster_topics[cluster_id] = ", ".join(top_terms)
            else:
                cluster_topics[cluster_id] = "empty"
        
        return cluster_topics

class QdrantManager:
    def __init__(self, host: str = "localhost", port: int = 6333):
        self.client = qdrant_client.QdrantClient(host=host, port=port)
        self.collection_name = "video_keyframes"
    
    def create_collection(self):
        """Create Qdrant collection for video keyframes"""
        try:
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config={
                    "image": VectorParams(size=512, distance=Distance.COSINE),
                    "text": VectorParams(size=512, distance=Distance.COSINE),
                    "pose": VectorParams(size=64, distance=Distance.COSINE),
                }
            )
            logger.info(f"Created collection: {self.collection_name}")
        except Exception as e:
            logger.info(f"Collection might already exist: {e}")
    
    def index_keyframe(self, keyframe: KeyframeData):
        """Index a single keyframe in Qdrant"""
        # Create pose embedding (simplified - you might want more sophisticated pose encoding)
        pose_embedding = np.zeros(64)
        if keyframe.poses:
            # Simple pose encoding - flatten first few landmarks
            landmarks = keyframe.poses[0]['landmarks'][:16]  # First 16 landmarks
            for i, lm in enumerate(landmarks):
                if i*4 < 64:
                    pose_embedding[i*4:i*4+4] = [lm['x'], lm['y'], lm['z'], lm['visibility']]
        
        point = PointStruct(
            id=hashlib.md5(keyframe.frame_id.encode()).hexdigest(),
            vector={
                "image": keyframe.scene_embedding.tolist(),
                "text": keyframe.text_embedding.tolist(),
                "pose": pose_embedding.tolist()
            },
            payload={
                "video_id": keyframe.video_id,
                "timestamp": keyframe.timestamp,
                "image_path": keyframe.image_path,
                "caption": keyframe.caption,
                "objects": keyframe.objects,
                "poses": keyframe.poses,
                "actions": keyframe.actions,
                "ocr_text": keyframe.ocr_text
            }
        )
        
        self.client.upsert(
            collection_name=self.collection_name,
            points=[point]
        )

class VideoProcessor:
    def __init__(self, storage_manager: GoogleStorageManager, analyzer: MultiModalAnalyzer, 
                 qdrant_manager: QdrantManager):
        self.storage = storage_manager
        self.analyzer = analyzer
        self.qdrant = qdrant_manager
        self.clustering = VideoClustering()
    
    async def process_video(self, video_path: str, video_id: str) -> VideoMetadata:
        """Process a single video through the entire pipeline"""
        logger.info(f"Processing video: {video_id}")
        
        # Extract video metadata
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        duration = frame_count / fps
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        cap.release()
        
        # Extract keyframes
        keyframes = self.analyzer.extract_keyframes(video_path)
        logger.info(f"Extracted {len(keyframes)} keyframes")
        
        # Extract audio transcription
        audio_text = self.analyzer.extract_audio_text(video_path)
        
        # Process each keyframe
        processed_keyframes = []
        for i, (timestamp, frame) in enumerate(keyframes):
            frame_id = f"{video_id}_frame_{i:04d}"
            
            # Save keyframe image
            keyframe_path = f"keyframes/{video_id}/{frame_id}.jpg"
            local_keyframe_path = f"/tmp/{frame_id}.jpg"
            cv2.imwrite(local_keyframe_path, frame)
            
            # Upload to storage
            remote_path = self.storage.upload_file(local_keyframe_path, keyframe_path)
            
            # Analyze keyframe
            caption = self.analyzer.generate_caption(frame)
            objects = self.analyzer.detect_objects(frame)
            poses = self.analyzer.extract_poses(frame)
            ocr_text = self.analyzer.extract_ocr_text(frame)
            
            # Generate embeddings
            combined_text = f"{caption} {ocr_text} {audio_text}"
            scene_embedding, text_embedding = self.analyzer.generate_embeddings(frame, combined_text)
            
            # Create keyframe data
            keyframe_data = KeyframeData(
                frame_id=frame_id,
                video_id=video_id,
                timestamp=timestamp,
                image_path=remote_path,
                caption=caption,
                objects=objects,
                poses=poses,
                actions=[],  # TODO: Add action recognition
                ocr_text=ocr_text,
                scene_embedding=scene_embedding,
                text_embedding=text_embedding,
                pose_embedding=np.zeros(64)  # Simplified
            )
            
            # Index in Qdrant
            self.qdrant.index_keyframe(keyframe_data)
            processed_keyframes.append(keyframe_data)
        
        return VideoMetadata(
            video_id=video_id,
            filename=Path(video_path).name,
            duration=duration,
            fps=fps,
            resolution=(width, height)
        )
    
    async def process_batch(self, video_paths: List[str]) -> Dict[str, Any]:
        """Process multiple videos and perform clustering"""
        processed_videos = []
        
        for video_path in video_paths:
            video_id = Path(video_path).stem
            metadata = await self.process_video(video_path, video_id)
            processed_videos.append(metadata)
        
        # TODO: Perform clustering after processing all videos
        # This would require collecting all keyframe data first
        
        return {
            "processed_count": len(processed_videos),
            "videos": processed_videos
        }

# Main processing function
async def main():
    # Initialize components
    storage_manager = GoogleStorageManager("your-bucket-name")
    analyzer = MultiModalAnalyzer()
    qdrant_manager = QdrantManager()
    
    # Create collection
    qdrant_manager.create_collection()
    
    # Initialize processor
    processor = VideoProcessor(storage_manager, analyzer, qdrant_manager)
    
    # Get video list from storage
    video_files = storage_manager.list_files("Videos_L21_a/")
    video_paths = [f"gs://your-bucket/{file}" for file in video_files if file.endswith('.mp4')]
    
    # Process videos
    results = await processor.process_batch(video_paths[:5])  # Process first 5 videos
    
    logger.info(f"Processing complete: {results}")

if __name__ == "__main__":
    asyncio.run(main())