# Comprehensive Video Annotation with Generative AI

This notebook provides a detailed walkthrough of an advanced, automated video annotation system. It leverages powerful Generative AI models (like Gemini) through the Dataloop platform to process and analyze videos of home activities, generating high-quality, structured annotations.

### Overview

This guide will walk you through the following key stages:

1. **[Environment Setup](#environment-setup):** Installing dependencies and connecting to your Dataloop environment.
2. **[Project, Model, and Dataset Setup](#project-setup):** Configuring your Dataloop project, dataset, and the GenAI model for the annotation task.
3. **[VideoAnnotations Class Definition](#class-definition):** Exploring the comprehensive Python class that encapsulates the entire video processing logic.
4. **[Initialize Video Processor](#initialize-processor):** Creating an instance of the processor to run on our data.
5. **[Test Single Video Processing](#test-single-video):** Performing a dry run on a single video to verify the setup.
6. **[Process Test Video](#process-single-video):** Executing the full, comprehensive analysis on the test video and reviewing the results.
7. **[Batch Processing Function](#batch-function):** Defining a function to apply the annotation process to multiple videos efficiently.
8. **[Run Full Batch Processing (Optional)](#run-batch):** Executing the pipeline on a larger set of videos from your dataset.
9. **[Conclusion](#conclusion):** Summarizing the process and suggesting next steps.

## <a id='environment-setup'></a>1. Environment Setup

We begin by importing the necessary Python libraries. These packages provide the tools for interacting with the Dataloop platform (`dtlpy`), handling video and image data (`cv2`, `Pillow`), performing numerical operations (`numpy`), and managing files and data structures (`os`, `json`, etc.).

The script will then authenticate with the Dataloop platform. If your security token is expired, `dl.login()` will open a browser window for you to log in.

In [None]:
import time
import dtlpy as dl
import cv2
import numpy as np
import json
import os
import tempfile
from PIL import Image
from typing import List, Dict, Any

dl.setenv('prod')

# Login to Dataloop
if dl.token_expired():
    dl.login()
print('Successfully logged into Dataloop')

## <a id='project-setup'></a>2. Project, Model, and Dataset Setup

In this section, we'll connect to the required Dataloop entities. We will get the project and dataset, and then install and configure the Gemini model that will perform the analysis.

### 2.1. Connect to Project and Dataset

**Action Required:** You must replace the placeholder values in the cell below with your specific Dataloop project and dataset names.

In [None]:
PROJECT_NAME = '<your-project-name>'
DATASET_NAME = '<your-dataset-name>'
MODEL_NAME = 'gemini-2.5-flash'


# Get project, dataset, and model
project = dl.projects.get(project_name=PROJECT_NAME)
dataset = project.datasets.get(dataset_name=DATASET_NAME)

### 2.2. Install and Retrieve the Model

Now, we will install the generative model into our project. We fetch the `gemini-2.5` DPK (Dataloop Package), which contains the model definition, and install it as an App in the project. The `try/except` block gracefully handles cases where the app is already installed. Finally, we retrieve the model entity associated with this app.

In [None]:
# Install model for video frame analysis
model_dpk = dl.dpks.get(dpk_name='gemini-2.5')

try:
    model_app = project.apps.install(
        app_name=model_dpk.display_name, 
        dpk=model_dpk, 
        custom_installation=model_dpk.to_json()
    )
    print(f"Installed {model_dpk.display_name} app: {model_app.name}")
except dl.exceptions.BadRequest as e:
    print(f"{model_dpk.display_name} app already installed, getting existing app")
    model_app = project.apps.get(app_name=model_dpk.display_name)

# Get VLM model
filters = dl.Filters(resource=dl.FiltersResource.MODEL)
filters.add(field='app.id', values=model_app.id)
model = project.models.list(filters=filters)[0][0]

print(f"{model_dpk.display_name} Model: {model.name}")

### 2.3. Verify Model Deployment

Models in Dataloop must be deployed as a live service to be used for inference. The following cell checks the model's status and provides a warning and a direct link to the model's page in the web UI if it's not yet deployed.

In [None]:
# Check if model is deployed
if model.status != "deployed":
    print("\n WARNING: Model is not deployed!")
    print("Please deploy the model through the platform interface before proceeding.")
    if model.status != "deployed":
        print("Deploy model:")
        model.open_in_web()
else:
    print("✓ Model is deployed and ready for processing")

### 2.4. Configure the Model

To tailor the model's responses to our specific needs, we will update its configuration. We'll set a `system_prompt` to instruct the model to act as an expert video analyst specializing in home activities. We also adjust the `max_tokens` to allow for detailed and comprehensive responses.

In [None]:
# Configure model for video frame analysis
model.configuration['system_prompt'] = """You are an expert video analyst specializing in home activity recognition. 
Your task is to analyze video frames showing home environments and activities. Focus on:
1. Room identification and layout
2. People and their activities
3. Objects and furniture
4. Spatial relationships and context

Provide detailed, accurate descriptions that capture the key elements visible in the frames. 
Be specific about room types, activities, and objects. If you're uncertain about any aspect, 
acknowledge your uncertainty rather than making assumptions."""

model.configuration['max_tokens'] = 4096
model.configuration['max_output_tokens'] = 4096
model.update(True)

print(f"{model.name} model configured for video analysis")

### 2.5. Verify Setup

Let's print the names of our configured project, dataset, and model to confirm everything is connected correctly before proceeding.

In [None]:
print(f'Connected to project: {project.name}')
print(f'Connected to dataset: {dataset.name}')
print(f'Using model: {model.name}')
print(f'Dataset contains {dataset.items_count} items')

## <a id='class-definition'></a>3. VideoAnnotations Class Definition

The core logic of our video processing pipeline is encapsulated in the `VideoAnnotations` class defined below. This comprehensive class handles every step of the process, from downloading the video to generating and uploading structured annotations.

Here is a brief overview of its key methods:
- **`process_video`**: The main orchestrator that calls all other methods in sequence for a single video.
- **`analyze_video_characteristics`**: Gathers basic video metadata like duration, FPS, and aspect ratio.
- **`extract_keyframes`**: Implements an advanced strategy to select diverse and high-quality frames for analysis, avoiding blurry or uninformative ones.
- **`create_enhanced_montage`**: Stitches the selected keyframes into a single montage image, applying enhancements to improve clarity for the model.
- **`create_adaptive_summary`**: Creates different types of montages based on video length and orientation.
- **`analyze_with_model`**: Sends the final montage(s) to the configured Gemini model with a detailed prompt, requesting a comprehensive analysis.
- **`generate_structured_annotation`**: Takes the raw text analysis from the model and uses a series of targeted prompts to convert it into structured JSON for different annotation types (room, participant, actions, objects).
- **`create_dataloop_annotations`**: Converts the final JSON data into Dataloop Annotation objects, ready for upload.
- **`clean_malformed_json`**: A robust helper function to clean and fix common JSON formatting issues that can occur in model responses.

In [None]:
class VideoAnnotations:
    """
    Video annotation processor for home activity videos using comprehensive analysis.
    """
    
    def __init__(self, dataset, model):
        self.dataset = dataset
        self.model = model
        
    def process_video(self, video_item: dl.Item, progress_callback=None) -> Dict[str, Any]:
        """
        Process video with comprehensive analysis.
        
        Args:
            video_item: Video item to process
            progress_callback: Optional callback for progress updates
        """
        start_time = time.time()
        timing_data = {}
        
        results = {
            "video_id": video_item.id,
            "video_name": video_item.name,
            "status": "processing",
            "annotations_created": 0,
            "room_annotation": None,
            "participant_annotation": None,
            "action_annotation": None,
            "object_annotations": [],
            "errors": [],
            "timing": timing_data,
            "processing_mode": "comprehensive"
        }
        
        try:
            # 1. Download video
            download_start = time.time()
            if progress_callback:
                progress_callback(f"Downloading {video_item.name}...")
            
            video_path = video_item.download()
            timing_data["download_time"] = time.time() - download_start
            
            # 2. Advanced frame extraction and analysis
            extract_start = time.time()
            if progress_callback:
                progress_callback(f"Analyzing video characteristics...")
            
            # Analyze video characteristics
            characteristics = self.analyze_video_characteristics(video_path)
            timing_data["characteristics_analysis"] = time.time() - extract_start
            
            # 3. Create adaptive summaries
            summary_start = time.time()
            if progress_callback:
                progress_callback(f"Creating adaptive summaries...")
            
            summaries = self.create_adaptive_summary(video_path, characteristics)
            timing_data["summary_creation"] = time.time() - summary_start
            
            # 4. Multi-strategy model analysis
            model_start = time.time()
            if progress_callback:
                progress_callback(f"Comprehensive model analysis...")
            
            analysis = self.analyze_with_model(video_item, summaries)
            timing_data["model_analysis"] = time.time() - model_start
            
            # 5. Enhanced annotation generation
            annotation_start = time.time()
            if progress_callback:
                progress_callback(f"Generating enhanced annotations...")
            
            annotations_data = self.generate_annotations(analysis)
            timing_data["annotation_generation"] = time.time() - annotation_start
            
            # 6. Upload annotations
            upload_start = time.time()
            annotations_to_create = self.create_dataloop_annotations(annotations_data)
            
            if annotations_to_create:
                video_item.annotations.upload(annotations_to_create)
                results["annotations_created"] = len(annotations_to_create)
            
            timing_data["annotation_upload"] = time.time() - upload_start
            
            # 7. Update results
            results.update({
                "room_annotation": annotations_data.get("room"),
                "participant_annotation": annotations_data.get("participant"),
                "action_annotation": annotations_data.get("action"),
                "object_annotations": annotations_data.get("objects", []),
                "status": "completed",
                "video_characteristics": characteristics
            })
            
            # 8. Cleanup
            cleanup_start = time.time()
            if os.path.exists(video_path):
                os.remove(video_path)
            timing_data["cleanup"] = time.time() - cleanup_start
            
        except Exception as e:
            results["status"] = "error"
            results["errors"].append(f"Processing exception: {str(e)}")
        
        timing_data["total_time"] = time.time() - start_time
        return results
    
    def extract_keyframes(self, video_path: str, num_frames: int = 8) -> List[np.ndarray]:
        """
        Advanced keyframe extraction with motion analysis and scene detection.
        """
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        if total_frames < num_frames:
            num_frames = total_frames
        
        # Strategy 1: Extract frames from different segments of the video
        segment_size = total_frames // num_frames
        frame_indices = []
        
        for i in range(num_frames):
            # Take frame from middle of each segment
            segment_start = i * segment_size
            segment_middle = segment_start + segment_size // 2
            frame_indices.append(min(segment_middle, total_frames - 1))
        
        # Strategy 2: Add some variation within segments
        if num_frames > 4:
            # Replace some middle frames with quarter and three-quarter points
            for i in range(1, min(num_frames-1, 4)):
                segment_start = i * segment_size
                quarter_point = segment_start + segment_size // 4
                frame_indices[i] = min(quarter_point, total_frames - 1)
        
        frames = []
        for frame_idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            ret, frame = cap.read()
            if ret:
                # Quality check - skip very dark or very bright frames
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                mean_brightness = np.mean(gray)
                
                # Skip frames that are too dark (<30) or too bright (>220)
                if 30 < mean_brightness < 220:
                    frames.append(frame)
        
        cap.release()
        
        # If we don't have enough good frames, fall back to simple linear sampling
        if len(frames) < 3:
            cap = cv2.VideoCapture(video_path)
            frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
            frames = []
            for frame_idx in frame_indices:
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                ret, frame = cap.read()
                if ret:
                    frames.append(frame)
            cap.release()
        
        return frames
    
    def create_enhanced_montage(self, frames: List[np.ndarray], max_width: int = 1920, target_height: int = 400) -> np.ndarray:
        """
        Create enhanced montage with quality improvements.
        """
        if not frames:
            return None
        
        # Filter out very similar frames to improve diversity
        filtered_frames = []
        for i, frame in enumerate(frames):
            if i == 0:
                filtered_frames.append(frame)
            else:
                # Calculate similarity with previous frame
                prev_frame = cv2.resize(frames[i-1], (100, 100))
                curr_frame = cv2.resize(frame, (100, 100))
                diff = cv2.absdiff(prev_frame, curr_frame)
                similarity = np.mean(diff)
                
                # Only add frame if it's sufficiently different
                if similarity > 10:
                    filtered_frames.append(frame)
        
        # Use filtered frames, but ensure we have at least 3 frames
        if len(filtered_frames) < 3:
            filtered_frames = frames[:min(len(frames), 5)]
        
        # Resize frames to consistent height with better interpolation
        resized_frames = []
        for frame in filtered_frames:
            h, w = frame.shape[:2]
            aspect_ratio = w / h
            new_width = int(target_height * aspect_ratio)
            
            # Use INTER_LANCZOS4 for better quality when upscaling
            if new_width > w:
                resized_frame = cv2.resize(frame, (new_width, target_height), interpolation=cv2.INTER_LANCZOS4)
            else:
                resized_frame = cv2.resize(frame, (new_width, target_height), interpolation=cv2.INTER_AREA)
            
            # Apply slight sharpening to improve clarity
            kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
            sharpened = cv2.filter2D(resized_frame, -1, kernel)
            # Blend original and sharpened (70% original, 30% sharpened)
            enhanced_frame = cv2.addWeighted(resized_frame, 0.7, sharpened, 0.3, 0)
            
            resized_frames.append(enhanced_frame)
        
        # Create montage with spacing between frames
        spacing = 10  # pixels between frames
        total_width = sum(frame.shape[1] for frame in resized_frames) + spacing * (len(resized_frames) - 1)
        
        # Create montage with white background
        montage = np.ones((target_height, total_width, 3), dtype=np.uint8) * 255
        
        # Place frames with spacing
        x_offset = 0
        for frame in resized_frames:
            h, w = frame.shape[:2]
            montage[0:h, x_offset:x_offset+w] = frame
            x_offset += w + spacing
        
        # Resize if too wide, using high-quality interpolation
        if montage.shape[1] > max_width:
            scale = max_width / montage.shape[1]
            new_height = int(montage.shape[0] * scale)
            montage = cv2.resize(montage, (max_width, new_height), interpolation=cv2.INTER_LANCZOS4)
        
        # Apply final enhancement
        alpha = 1.1  # Contrast control
        beta = 5     # Brightness control
        montage = cv2.convertScaleAbs(montage, alpha=alpha, beta=beta)
        
        return montage
    
    def analyze_video_characteristics(self, video_path: str) -> Dict[str, Any]:
        """
        Analyze video characteristics for adaptive processing.
        """
        cap = cv2.VideoCapture(video_path)
        
        characteristics = {
            'aspect_ratio': cap.get(cv2.CAP_PROP_FRAME_WIDTH) / cap.get(cv2.CAP_PROP_FRAME_HEIGHT),
            'total_frames': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
            'fps': cap.get(cv2.CAP_PROP_FPS),
            'duration': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / cap.get(cv2.CAP_PROP_FPS),
            'motion_level': 'unknown',
            'scene_changes': 0,
            'dominant_orientation': 'landscape'
        }
        
        # Determine orientation
        if characteristics['aspect_ratio'] < 1.0:
            characteristics['dominant_orientation'] = 'portrait'
        elif characteristics['aspect_ratio'] > 1.5:
            characteristics['dominant_orientation'] = 'wide_landscape'
        
        cap.release()
        return characteristics
    
    def create_adaptive_summary(self, video_path: str, characteristics: Dict[str, Any]) -> Dict[str, np.ndarray]:
        """
        Create adaptive summary based on video characteristics.
        """
        summaries = {}
        
        if characteristics['duration'] > 300:  # Long video (>5 minutes)
            # Long video: create hierarchical summary
            overview_frames = self.extract_keyframes(video_path, num_frames=6)
            summaries['overview'] = self.create_enhanced_montage(overview_frames, max_width=1920, target_height=300)
            
            detailed_frames = self.extract_keyframes(video_path, num_frames=12)
            summaries['detailed'] = self.create_enhanced_montage(detailed_frames, max_width=2400, target_height=200)
        else:
            # Standard video: enhanced montages
            frames = self.extract_keyframes(video_path, num_frames=8)
            
            # Choose layout based on orientation
            if characteristics['dominant_orientation'] == 'portrait':
                summaries['primary'] = self.create_enhanced_montage(frames, max_width=1200, target_height=500)
            else:
                summaries['primary'] = self.create_enhanced_montage(frames, max_width=1920, target_height=400)
        
        return summaries
    
    def analyze_with_model(self, video_item: dl.Item, summaries: Dict[str, np.ndarray]) -> str:
        """
        Enhanced video analysis using multiple summaries.
        """
        analyses = []
        
        # Process each summary type
        for summary_type, montage in summaries.items():
            if montage is not None:
                # Convert montage to PIL Image
                montage_rgb = cv2.cvtColor(montage, cv2.COLOR_BGR2RGB)
                pil_image = Image.fromarray(montage_rgb)
                
                # Save and upload montage
                with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_file:
                    pil_image.save(temp_file.name, format='JPEG', quality=95)
                    temp_file_path = temp_file.name
                
                try:
                    # Upload montage
                    montage_item = self.dataset.items.upload(
                        local_path=temp_file_path,
                        remote_path=f"/.dataloop/temp_montages/",
                        remote_name=f"{video_item.name}_{summary_type}_montage.jpg",
                        overwrite=True,
                        item_metadata={"user": {"source_video": video_item.id, "temp_file": True}}
                    )
                    
                    # Create analysis prompt
                    model_prompt = f"""Analyze this {summary_type} video montage from {video_item.name}.
                    
                                        This montage shows {'detailed frames' if 'detailed' in summary_type else 'key moments'} from the video.

                                        Please provide comprehensive analysis focusing on:
                                        1. Room identification and layout
                                        2. People and their activities  
                                        3. Objects and furniture visible
                                        4. Actions being performed
                                        5. Context and temporal progression
                                        6. Specific details for annotation

                                        Be thorough and specific in your observations."""
                    
                    # Process with model
                    analysis = self.process_single_montage_with_model(montage_item, model_prompt)
                    analyses.append(f"=== {summary_type.upper()} ANALYSIS ===\n{analysis}")
                    
                finally:
                    if os.path.exists(temp_file_path):
                        os.remove(temp_file_path)
        
        # Combine all analyses
        combined_analysis = "\n\n".join(analyses)
        
        return combined_analysis
    
    def process_single_montage_with_model(self, montage_item: dl.Item, model_prompt: str) -> str:
        """
        Process a single montage image with model.
        """
        try:
            # Create prompt item
            prompt_item = dl.PromptItem(name=f"model_montage_analysis_{montage_item.name}")
            prompt = dl.Prompt(key='1')
            prompt.add_element(mimetype=dl.PromptType.TEXT, value=model_prompt)
            prompt.add_element(mimetype=dl.PromptType.IMAGE, value=montage_item.id)
            prompt_item.prompts.append(prompt)
            
            # Upload and process
            temp_prompt_item = self.dataset.items.upload(
                prompt_item,
                overwrite=True,
                remote_path=f"/.dataloop/temp_model_analysis/",
                item_metadata={"user": {"source_montage": montage_item.id}}
            )
            
            # Get model analysis
            execution = self.model.predict(item_ids=[temp_prompt_item.id])
            execution.wait()
            
            # Extract response
            annotations = temp_prompt_item.annotations.list()
            if annotations:
                analysis = annotations[0].coordinates
            else:
                analysis = "No analysis generated"
            
            # Clean up temporary item
            temp_prompt_item.delete()
            
            return analysis
            
        except Exception as e:
            return f"Error processing montage: {str(e)}"
    
    def generate_annotations(self, analysis: str) -> Dict[str, Any]:
        """
        Generate annotations with comprehensive analysis approach.
        """
        results = {}
        
        # Enhanced prompts with more context
        annotation_types = [
            ("room_classification", "room"),
            ("participant_description", "participant"),
            ("action_description", "action"),
            ("object_detection", "objects")
        ]
        
        for annotation_type, result_key in annotation_types:
            try:
                data = self.generate_structured_annotation(analysis, annotation_type)
                if "error" not in data:
                    if result_key == "objects" and "objects" in data:
                        results[result_key] = data["objects"][:10]
                    else:
                        results[result_key] = data
                else:
                    results[result_key] = None if result_key != "objects" else []
            except Exception as e:
                results[result_key] = None if result_key != "objects" else []
        
        return results
    
    def clean_malformed_json(self, json_text: str) -> str:
        """
        Enhanced JSON cleaning - handles both objects and arrays.
        """
        import re
        import json
        
        # Remove common markdown artifacts
        json_text = json_text.strip()
        
        # First, try to parse as-is - if it's already valid, don't fix it
        try:
            json.loads(json_text)
            return json_text  # Already valid JSON, return as-is
        except json.JSONDecodeError:
            pass  # Need to clean it
        
        # Check if this looks like an array or object
        if json_text.startswith('[') and json_text.endswith(']'):
            # This is an array - don't modify the structure, just clean trailing commas
            json_text = re.sub(r',(\s*[}\]])', r'\1', json_text)
        elif json_text.startswith('{') and json_text.endswith('}'):
            # This is an object - don't modify the structure, just clean trailing commas  
            json_text = re.sub(r',(\s*[}\]])', r'\1', json_text)
        else:
            # Extract JSON from mixed content
            start_idx = json_text.find('{')
            end_idx = json_text.rfind('}')
            
            if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
                json_text = json_text[start_idx:end_idx + 1]
                # Clean trailing commas
                json_text = re.sub(r',(\s*[}\]])', r'\1', json_text)
        
        return json_text
    
    def generate_structured_annotation(self, analysis: str, annotation_type: str) -> dict:
        """
        Generate annotations with updated schemas.
        """
        # Get schema based on type
        schema_map = {
            "room_classification": {
                "type": "object",
                "properties": {
                    "reference_rooms": {"type": "array", "items": {"type": "string"}},
                    "room_description": {"type": "string"},
                    "participant_activity_description": {"type": "string"}
                }
            },
            "participant_description": {
                "type": "array",
                "items": {
                    "type": "object", 
                    "properties": {
                        "participant_description": {"type": "string"}
                    },
                    "required": ["participant_description"]
                },
                "description": "Array of participant objects, one per participant"
            },
            "action_description": {
                "type": "object",
                "properties": {
                    "action_description": {"type": "string"}
                }
            },
            "object_detection": {
                "type": "object",
                "properties": {
                    "objects": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "object": {"type": "string"},
                                "object_color": {
                                    "type": "string",
                                    "enum": [
                                        "Basic: white, black, brown, gray, beige",
                                        "Wood tones: mahogany, oak, walnut, cherry",
                                        "Metals: silver, bronze, golden, copper",
                                        "Other: cream, navy, burgundy, sage"
                                    ]
                                },
                                "object_size_dimension": {
                                    "type": "string",
                                    "enum": ["Tall", "Short", "Wide", "Narrow", "Deep", "Shallow"]
                                },
                                "object_size_overall": {
                                    "type": "string",
                                    "enum": ["Large", "Medium", "Small", "Compact", "Massive"]
                                },
                                "object_size_space_occupation": {
                                    "type": "string",
                                    "enum": ["Bulky", "Slim", "Spacious"]
                                },
                                "object_shape_basic_forms": {
                                    "type": "array",
                                    "items": {
                                        "type": "string",
                                        "enum": ["Rectangular", "Square", "Round", "Oval", "Circular"]
                                    }
                                },
                                "object_shape_structural": {
                                    "type": "string",
                                    "enum": ["Curved", "Straight", "Angular", "Cylindrical"]
                                },
                                "object_shape_design": {
                                    "type": "string",
                                    "enum": ["Symmetrical", "Irregular", "Geometric"]
                                },
                                "object_materials": {
                                    "type": "array",
                                    "items": {
                                        "type": "string",
                                        "enum": [
                                            "Natural: wooden, leather, cotton, wool, wicker",
                                            "Synthetic: plastic, polyester, nylon, vinyl",
                                            "Metals: metallic, steel, aluminum, brass",
                                            "Glass: transparent, frosted, tinted",
                                            "Stone: marble, granite, concrete"
                                        ]
                                    }
                                },
                                "object_surface_touch": {
                                    "type": "string",
                                    "enum": ["Smooth", "Rough", "Textured", "Bumpy"]
                                },
                                "object_surface_finish": {
                                    "type": "string",
                                    "enum": ["Glossy", "Matte", "Polish", "Rustic"]
                                },
                                "object_condition_quality": {
                                    "type": "string",
                                    "enum": ["New", "Used", "Vintage", "Modern"]
                                },
                                "object_condition_state": {
                                    "type": "string",
                                    "enum": ["Clean", "Worn", "Pristine", "Damaged"]
                                },
                                "object_condition_power": {
                                    "type": "string",
                                    "enum": ["On", "Off"]
                                }
                            },
                            "required": ["object"]
                        }
                    }
                },
                "required": ["objects"]
            }
        }
        
        schema = schema_map.get(annotation_type, {})
        
        # Enhanced prompt for complex schemas
        if annotation_type == "object_detection":
            model_prompt = f"""Based on the comprehensive video analysis, generate detailed object detection annotation as valid JSON.

                                Video Analysis:
                                {analysis}

                                Generate a JSON response with "objects" array. For each object, include:
                                - object: name of the object (required)
                                - object_color: choose from ["Basic: white, black, brown, gray, beige", "Wood tones: mahogany, oak, walnut, cherry", "Metals: silver, bronze, golden, copper", "Other: cream, navy, burgundy, sage"]
                                - object_size_dimension: choose from ["Tall", "Short", "Wide", "Narrow", "Deep", "Shallow"]
                                - object_size_overall: choose from ["Large", "Medium", "Small", "Compact", "Massive"]
                                - object_size_space_occupation: choose from ["Bulky", "Slim", "Spacious"]
                                - object_shape_basic_forms: array from ["Rectangular", "Square", "Round", "Oval", "Circular"]
                                - object_shape_structural: choose from ["Curved", "Straight", "Angular", "Cylindrical"]
                                - object_shape_design: choose from ["Symmetrical", "Irregular", "Geometric"]
                                - object_materials: array from ["Natural: wooden, leather, cotton, wool, wicker", "Synthetic: plastic, polyester, nylon, vinyl", "Metals: metallic, steel, aluminum, brass", "Glass: transparent, frosted, tinted", "Stone: marble, granite, concrete"]
                                - object_surface_touch: choose from ["Smooth", "Rough", "Textured", "Bumpy"]
                                - object_surface_finish: choose from ["Glossy", "Matte", "Polish", "Rustic"]
                                - object_condition_quality: choose from ["New", "Used", "Vintage", "Modern"]
                                - object_condition_state: choose from ["Clean", "Worn", "Pristine", "Damaged"]
                                - object_condition_power: choose from ["On", "Off"]

                                CRITICAL INSTRUCTIONS: 
                                - Use EXACT COMPLETE enum values from the lists above
                                - Use proper JSON syntax with double quotes
                                - Ensure all arrays and objects are properly closed
                                - Focus on the most prominent/important objects only
                                - Analyze each object carefully and provide detailed attributes

                                Example format:
                                {{"objects": [{{"object": "dining table", "object_color": "Wood tones: mahogany, oak, walnut, cherry", "object_size_overall": "Large", "object_materials": ["Natural: wooden, leather, cotton, wool, wicker"], "object_shape_basic_forms": ["Rectangular"], "object_condition_quality": "Used"}}]}}

                                Respond with ONLY valid JSON:"""
        

        # Enhanced prompt for participant descriptions
        elif annotation_type == "participant_description":
            model_prompt = f"""Based on the comprehensive video analysis, identify and describe each individual participant separately.

                                Video Analysis:
                                {analysis}

                                Generate a JSON array where each element is an object with "participant_description" field containing one participant description.

                                CRITICAL INSTRUCTIONS:
                                - Create ONE object per participant you can clearly see
                                - Each object MUST have a "participant_description" field with detailed description of that individual person
                                - Include their appearance, activity, and any notable characteristics
                                - If you can only see one person, provide only one object
                                - Use proper JSON syntax with double quotes
                                - ALWAYS return an array, even for a single participant

                                REQUIRED FORMAT - Array of objects:
                                [
                                {{"participant_description": "Adult woman wearing blue shirt sitting at kitchen table eating breakfast"}},
                                {{"participant_description": "Young child in red pajamas playing with toys on the living room floor"}}
                                ]

                                For a single participant, still use array format:
                                [
                                {{"participant_description": "Middle-aged man in casual clothes cooking at the stove"}}
                                ]

                                Respond with ONLY valid JSON array (not an object with participants key):"""
        else:
            model_prompt = f"""Based on the comprehensive video analysis, generate a detailed {annotation_type} annotation in JSON format.

                                Video Analysis:
                                {analysis}

                                Please generate a JSON response that follows this exact schema:
                                {json.dumps(schema, indent=2)}

                                Instructions:
                                - Follow the JSON schema exactly
                                - Use only the provided enum values where specified
                                - Be specific and accurate based on the analysis
                                - Include all required fields
                                - Respond with ONLY valid JSON, no additional text

                                Generate the {annotation_type} annotation:"""
        
        # Create and process prompt
        prompt_item = dl.PromptItem(name=f"enhanced_{annotation_type}")
        prompt = dl.Prompt(key='1')
        prompt.add_element(mimetype=dl.PromptType.TEXT, value=model_prompt)
        prompt_item.prompts.append(prompt)
        
        temp_item = self.dataset.items.upload(
            prompt_item,
            overwrite=True,
            remote_path=f"/.dataloop/temp_model_enhanced/"
        )
        
        try:
            # Get model response
            execution = self.model.predict(item_ids=[temp_item.id])
            execution.wait()
            
            # Extract and parse response
            annotations = temp_item.annotations.list()
            if annotations:
                response_text = annotations[0].coordinates.strip()
                
                # Clean response
                if response_text.startswith('```json'):
                    response_text = response_text[7:]
                if response_text.endswith('```'):
                    response_text = response_text[:-3]
                response_text = response_text.strip()
                
                # Enhanced JSON cleaning for malformed responses
                response_text = self.clean_malformed_json(response_text)
                
                parsed_response = json.loads(response_text)
                
                normalized_response = self.normalize_annotation_response(parsed_response, annotation_type)
                
                return normalized_response
            else:
                return {"error": "No response generated"}
                
        except json.JSONDecodeError as e:
            return {"error": f"Invalid JSON: {str(e)}"}
        except Exception as e:
            return {"error": f"Processing error: {str(e)}"}
        finally:
            temp_item.delete()

    def normalize_annotation_response(self, response: Any, annotation_type: str) -> dict:
        """
        Normalize the response to ensure it matches the expected format.
        """
        if annotation_type == "participant_description":
            # Ensure we always return a properly formatted array
            if isinstance(response, list):
                # Validate each item in the list
                normalized_list = []
                for item in response:
                    if isinstance(item, dict) and "participant_description" in item:
                        normalized_list.append(item)
                    elif isinstance(item, str):
                        # Convert string to proper format
                        normalized_list.append({"participant_description": item})
                return normalized_list if normalized_list else [{"participant_description": "No participants detected"}]
            
            elif isinstance(response, dict):
                # Handle legacy format like {"participants": ["desc1", "desc2"]}
                if "participants" in response:
                    participants = response["participants"]
                    if isinstance(participants, list):
                        return [{"participant_description": desc} for desc in participants if isinstance(desc, str)]
                # Handle single participant object
                elif "participant_description" in response:
                    return [response]
            
            # Fallback
            return [{"participant_description": "Error: Invalid participant data format"}]
        
        elif annotation_type == "object_detection":
            # Ensure we have the objects key and it's a list
            if isinstance(response, dict) and "objects" in response:
                objects = response["objects"]
                if isinstance(objects, list):
                    # Validate each object has required fields
                    validated_objects = []
                    for obj in objects:
                        if isinstance(obj, dict) and "object" in obj:
                            validated_objects.append(obj)
                    return {"objects": validated_objects}
            
            # Fallback
            return {"objects": []}
        
        elif annotation_type == "room_classification":
            # Ensure it's a dict with required fields
            if isinstance(response, dict):
                normalized = {}
                # Ensure all required fields exist
                normalized["reference_rooms"] = response.get("reference_rooms", [])
                normalized["room_description"] = response.get("room_description", "")
                normalized["participant_activity_description"] = response.get("participant_activity_description", "")
                return normalized
            
            # Fallback
            return {
                "reference_rooms": [],
                "room_description": "Error: Invalid room data format",
                "participant_activity_description": ""
            }
        
        elif annotation_type == "action_description":
            # Ensure it's a dict with action_description field
            if isinstance(response, dict) and "action_description" in response:
                return response
            elif isinstance(response, str):
                return {"action_description": response}
            
            # Fallback
            return {"action_description": "Error: Invalid action data format"}
        
        return response

    def create_dataloop_annotations(self, annotations_data: Dict[str, Any]) -> List[dl.Annotation]:
        """
        Create Dataloop annotations from the generated data.
        Now expects properly normalized data formats.
        """
        annotations_to_create = []
        
        # Room annotation
        if annotations_data.get("room"):
            try:
                # Ensure room data is a dictionary
                room_data = annotations_data["room"]
                if not isinstance(room_data, dict):
                    print(f"Warning: Room data is not a dict: {type(room_data)} - {room_data}")
                    room_data = {"room_description": str(room_data)}
                
                room_annotation = dl.Annotation.new(
                    annotation_definition=dl.Classification(
                        label="room",
                        attributes=room_data
                    ),
                    frame_num=0
                )
                annotations_to_create.append(room_annotation)
            except Exception as e:
                print(f"Error creating room annotation: {e}")
                print(f"Room data: {annotations_data['room']}")
        
        # Individual participant annotations (one per participant)
        if annotations_data.get("participant"):
            try:
                participant_list = annotations_data["participant"]
                if not isinstance(participant_list, list):
                    print(f"Warning: Participant data is not a list: {type(participant_list)} - {participant_list}")
                    participant_list = [participant_list] if isinstance(participant_list, dict) else []
                
                for i, participant in enumerate(participant_list):
                    if not isinstance(participant, dict):
                        print(f"Warning: Participant {i} is not a dict: {type(participant)} - {participant}")
                        participant = {"participant_description": str(participant)}
                    
                    participant_annotation = dl.Annotation.new(
                        annotation_definition=dl.Classification(
                            label="participant description",
                            attributes=participant
                        ),
                        frame_num=0
                    )
                    annotations_to_create.append(participant_annotation)
            except Exception as e:
                print(f"Error creating participant annotations: {e}")
                print(f"Participant data: {annotations_data['participant']}")
        
        # Action annotation
        if annotations_data.get("action"):
            try:
                # Ensure action data is a dictionary
                action_data = annotations_data["action"]
                if not isinstance(action_data, dict):
                    print(f"Warning: Action data is not a dict: {type(action_data)} - {action_data}")
                    action_data = {"action_description": str(action_data)}
                
                action_annotation = dl.Annotation.new(
                    annotation_definition=dl.Classification(
                        label="actions",
                        attributes=action_data
                    ),
                    frame_num=0
                )
                annotations_to_create.append(action_annotation)
            except Exception as e:
                print(f"Error creating action annotation: {e}")
                print(f"Action data: {annotations_data['action']}")
        
        # Object annotations
        for i, obj in enumerate(annotations_data.get("objects", [])):
            try:
                if not isinstance(obj, dict):
                    print(f"Warning: Object {i} is not a dict: {type(obj)} - {obj}")
                    obj = {"object": str(obj)}
                
                obj_annotation = dl.Annotation.new(
                    annotation_definition=dl.Classification(
                        label="objects",
                        attributes=obj
                    ),
                    frame_num=0
                )
                annotations_to_create.append(obj_annotation)
            except Exception as e:
                print(f"Error creating object annotation {i}: {e}")
                print(f"Object data: {obj}")
        
        return annotations_to_create

    def print_processing_summary(self, results: Dict[str, Any]):
        """
        Print processing summary.
        """
        timing = results.get("timing", {})
        total_time = timing.get("total_time", 0)
        mode = results.get("processing_mode", "unknown")
        
        print(f"\n=== PROCESSING SUMMARY FOR {results['video_name']} ===")
        print(f"Processing Mode: {mode.upper()}")
        print(f"Total Time: {total_time:.2f}s")
        print(f"Status: {results['status']}")
        print(f"Annotations Created: {results['annotations_created']}")
        
        if results.get("room_annotation"):
            print(f"✓ Room classification generated")
        if results.get("participant_annotation") and "participants" in results["participant_annotation"]:
            participant_count = len(results["participant_annotation"]["participants"])
            print(f"✓ {participant_count} individual participant annotation(s) generated")
        if results.get("action_annotation"):
            print(f"✓ Action description generated")
        if results.get("object_annotations"):
            print(f"✓ {len(results['object_annotations'])} object annotations generated")
        
        if results.get("errors"):
            print(f"⚠️ Errors: {len(results['errors'])}")
            for error in results["errors"]:
                print(f"  - {error}")

print("VideoAnnotations class defined successfully")

## <a id='initialize-processor'></a>4. Initialize Video Processor

With the class defined, we now create an instance of it, passing our Dataloop dataset and model objects. This `processor` object is now ready to be used for annotation.

In [None]:
# Initialize the video annotation processor
processor = VideoAnnotations(dataset, model)
print("Video annotation processor initialized successfully")
print(f"Using dataset: {dataset.name}")
print(f"Using model: {model.name}")

## <a id='test-single-video'></a>5. Test Single Video Processing

Before running the pipeline on the entire dataset, it's a good practice to test it on a single video. This helps ensure that all parts of the process are working correctly without consuming excessive time or resources.

**Action Required:** You must provide a valid video `item_id` from your dataset in the cell below. Replace the placeholder with an actual ID.

In [None]:
# Get test videos from dataset
print("Getting test video...")

test_video = dataset.items.get(item_id='<YOUR_VIDEO_ITEM_ID>')
    
print(f"Testing with video: {test_video.name} (ID: {test_video.id})")

## <a id='process-single-video'></a>6. Process Test Video

Now, we execute the comprehensive processing pipeline on the single test video. The output will show the step-by-step progress as the video is downloaded, analyzed, and annotated. At the end, it will print a detailed summary of the results, including the generated annotations and a breakdown of the time taken for each step.

In [None]:
# Process the test video
if test_video:
    print(f"\n=== PROCESSING {test_video.name} ===")
    
    # Define progress callback
    def progress_callback(message):
        print(f"  {message}")
    
    # Process video
    result = processor.process_video(test_video, progress_callback=progress_callback)
    
    # Print detailed results
    processor.print_processing_summary(result)
    
    print(f"\n=== DETAILED RESULTS ===")
    if result.get("room_annotation"):
        print(f"\nRoom Annotation:")
        print(json.dumps(result["room_annotation"], indent=2))
    
    if result.get("participant_annotation"):
        print(f"\nParticipant Annotation:")
        print(json.dumps(result["participant_annotation"], indent=2))
    
    if result.get("action_annotation"):
        print(f"\nAction Annotation:")
        print(json.dumps(result["action_annotation"], indent=2))
    
    if result.get("object_annotations"):
        print(f"\nObject Annotations ({len(result['object_annotations'])} objects):")
        for i, obj in enumerate(result["object_annotations"][:5]):  # Show first 5 objects
            print(f"  Object {i+1}: {obj.get('object', 'Unknown')}")
            if 'object_color' in obj:
                print(f"    Color: {obj['object_color']}")
            if 'object_size_overall' in obj:
                print(f"    Size: {obj['object_size_overall']}")
    
    if result.get("video_characteristics"):
        print(f"\nVideo Characteristics:")
        print(json.dumps(result["video_characteristics"], indent=2))
    
    # Show timing breakdown
    if result.get("timing"):
        print(f"\nTiming Breakdown:")
        for step, duration in result["timing"].items():
            print(f"  {step}: {duration:.2f}s")
else:
    print("No test video available")

## <a id='batch-function'></a>7. Batch Processing Function

To scale our operation, we define a function `process_video_batch`. This function iterates through a list of video items from our dataset, applies the same comprehensive processing pipeline to each one, and tracks the overall progress. It provides real-time updates, including an estimated time of completion (ETA) and a final summary of successful and failed jobs.

In [None]:
def process_video_batch(dataset, processor, max_videos=None, progress_interval=1):
    """
    Process multiple videos using the comprehensive processor.
    
    Args:
        dataset: Dataloop dataset
        processor: VideoAnnotations processor instance
        max_videos: Maximum number of videos to process (None for all)
        progress_interval: How often to print progress updates
    """
    # Get all video items
    video_items = list(dataset.items.list().all())
    
    if max_videos:
        video_items = video_items[:max_videos]
    
    print(f"\n=== BATCH PROCESSING {len(video_items)} VIDEOS ===")
    
    results = {
        'successful': [],
        'failed': [],
        'total_time': 0,
        'start_time': time.time()
    }
    
    for i, video_item in enumerate(video_items, 1):
        print(f"\n[{i}/{len(video_items)}] Processing: {video_item.name}")
        
        # # Skip if already annotated
        # if video_item.annotations.list():
        #     continue
        
        try:
            # Process video
            result = processor.process_video(
                video_item, 
                progress_callback=lambda msg: print(f"  {msg}")
            )
            
            if result['status'] == 'completed':
                results['successful'].append({
                    'name': video_item.name,
                    'id': video_item.id,
                    'annotations_created': result['annotations_created'],
                    'processing_time': result['timing'].get('total_time', 0)
                })
                print(f"  ✓ Success: {result['annotations_created']} annotations created")
            else:
                results['failed'].append({
                    'name': video_item.name,
                    'id': video_item.id,
                    'errors': result.get('errors', ['Unknown error'])
                })
                print(f"  ✗ Failed: {result.get('errors', ['Unknown error'])}")
                
        except Exception as e:
            results['failed'].append({
                'name': video_item.name,
                'id': video_item.id,
                'errors': [str(e)]
            })
            print(f"  ✗ Exception: {str(e)}")
        
        # Progress update
        if i % progress_interval == 0 or i == len(video_items):
            elapsed = time.time() - results['start_time']
            avg_time = elapsed / i
            eta = avg_time * (len(video_items) - i)
            success_rate = len(results['successful']) / i * 100
            
            print(f"\n  Progress: {i}/{len(video_items)} ({i/len(video_items)*100:.1f}%)")
            print(f"  Success rate: {success_rate:.1f}%")
            print(f"  Elapsed: {elapsed/60:.1f}min, ETA: {eta/60:.1f}min")
    
    # Final summary
    results['total_time'] = time.time() - results['start_time']
    
    print(f"\n=== BATCH PROCESSING COMPLETE ===")
    print(f"Total videos: {len(video_items)}")
    print(f"Successful: {len(results['successful'])}")
    print(f"Failed: {len(results['failed'])}")
    print(f"Success rate: {len(results['successful'])/len(video_items)*100:.1f}%")
    print(f"Total time: {results['total_time']/60:.1f} minutes")
    print(f"Average time per video: {results['total_time']/len(video_items):.1f} seconds")
    
    if results['successful']:
        total_annotations = sum(item['annotations_created'] for item in results['successful'])
        print(f"Total annotations created: {total_annotations}")
    
    if results['failed']:
        print(f"\nFailed videos:")
        for item in results['failed']:
            print(f"  - {item['name']}: {item['errors'][0] if item['errors'] else 'Unknown error'}")
    
    return results

print("Batch processing function defined")

## <a id='run-batch'></a>8. Run Full Batch Processing (Optional)

This final execution cell runs the batch processing function on your dataset.

> **Warning:** Running this cell may be time-consuming and could incur costs associated with GenAI model usage, as it will process multiple videos. 

For a trial run, you can uncomment the `max_videos` parameter and set it to a small number (e.g., `max_videos=5`) to limit the number of videos processed.

In [None]:
# Run batch processing on all videos
batch_results = process_video_batch(
    dataset=dataset,
    processor=processor,
    # max_videos=5, # Uncomment and set a number to limit the batch size for testing
    progress_interval=1
)

print("Batch processing section ready")

## <a id='conclusion'></a>9. Conclusion

Congratulations! You have successfully walked through a comprehensive, end-to-end video annotation pipeline. You have learned how to:

1.  Set up your Dataloop environment and configure a powerful Generative AI model.
2.  Implement a detailed Python class to manage a multi-step video analysis process, including intelligent frame extraction, montage creation, and model interaction.
3.  Test the pipeline on a single video and review the structured, multi-faceted annotations.
4.  Scale the process using a batch function to annotate an entire dataset.

### Next Steps

From here, you can expand on this foundation:
*   **Analyze the Results:** Review the generated annotations in the Dataloop platform. Use filters and queries to gain insights from your newly structured data.
*   **Customize the Ontology:** Modify the `generate_structured_annotation` method to create a different annotation schema that better fits your specific use case.
*   **Integrate into a Pipeline:** Convert this notebook into a Dataloop FaaS (Function as a Service) and build an automated pipeline that triggers this annotation process whenever a new video is uploaded to your dataset.
*   **Experiment with Different Models:** Swap out the Gemini model for other VLM (Vision-Language Model) or specialized models available on the Dataloop Marketplace to compare performance and results.