In [1]:
%load_ext autoreload
from google import genai
from google.genai import types
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Initialize client
client = genai.Client(api_key=os.getenv("GOOGLE_GENERATIVE_AI_API_KEY"))

import os
import json

video_json_path = "myfile.json"

# If the file exists, load it; otherwise, upload and save
if os.path.exists(video_json_path):
    with open(video_json_path, "r") as f:
        myfile_dict = json.load(f)
    # Reconstruct a File object if needed, or just use the dict
    myfile = types.File(**myfile_dict)
    print(f"Video loaded from cache: {myfile.name}")
else:
    myfile = client.files.upload(file="videos/output_long_again_2.mp4")
    print(f"Video uploaded: {myfile.name}")
    # Save the model dump to JSON
    with open(video_json_path, "w") as f:
        f.write(myfile.model_dump_json())

Video loaded from cache: files/wdfs4j4ufcd8


In [2]:
# STEP 1: PROCEDURE EXTRACTION (runs first)
from video_understanding.multi_prompts import create_procedure_extraction_prompt
from video_understanding.simple_models import ProcedureExtraction

procedure_json_path = "procedure_result.json"

print("=" * 50)
print("STEP 1: PROCEDURE EXTRACTION")
print("=" * 50)

system_prompt, user_prompt = create_procedure_extraction_prompt()

contents = [myfile, user_prompt]
print(f"Token count: {client.models.count_tokens(model='models/gemini-2.5-pro', contents=contents)}")

procedure_response = client.models.generate_content(
    model="models/gemini-2.5-pro",
    contents=contents,
    config={
        "response_mime_type": "application/json",
        "response_schema": ProcedureExtraction,
        "system_instruction": system_prompt
    },
)


procedure_result = procedure_response.parsed
with open(procedure_json_path, "w") as f:
    f.write(procedure_result.model_dump_json())
print("PROCEDURE EXTRACTION COMPLETE")
print(f"Goal wells: {len(procedure_result.goal_wells)}")
print(f"Reagent sources: {procedure_result.reagent_sources}")
print(f"Timestamp range: {procedure_result.timestamp_range}")

STEP 1: PROCEDURE EXTRACTION
Token count: total_tokens=19920 cached_content_token_count=None
PROCEDURE EXTRACTION COMPLETE
Goal wells: 3
Reagent sources: ['Reagent A', 'Reagent B', 'Reagent C']
Timestamp range: 0:01 - 1:12


In [2]:
import os
import json
from video_understanding.multi_prompts import format_procedure_context
from video_understanding.simple_models import ProcedureExtraction

procedure_json_path = "procedure_result.json"

# Try to load it back in if it exists
if os.path.exists(procedure_json_path):
    with open(procedure_json_path, "r") as f:
        loaded_json = f.read()
    procedure_result = ProcedureExtraction.model_validate_json(loaded_json)
    print("Loaded ProcedureExtraction from file:")
    print(f"Goal wells: {len(procedure_result.goal_wells)}")
    print(f"Reagent sources: {procedure_result.reagent_sources}")
    print(f"Timestamp range: {procedure_result.timestamp_range}")
else:
    print("ProcedureExtraction file not found.")



procedure_context = format_procedure_context(procedure_result)

Loaded ProcedureExtraction from file:
Goal wells: 9
Reagent sources: ['Reagent A', 'Reagent B', 'Reagent C']
Timestamp range: 0:00 - 1:12


In [3]:
# STEP 2: OBJECTIVE EVENTS EXTRACTION
from video_understanding.multi_prompts import create_objective_events_prompt
from video_understanding.simple_models import  PipetteSettingChange, AspirationEvent, DispensingEvent, TipChangeEvent
from pydantic import BaseModel, Field
from typing import List, Union


objective_events_json_path = "objective_events_result.json"

print("=" * 50)
print("STEP 2: OBJECTIVE EVENTS EXTRACTION")
print("=" * 50)

system_prompt, user_prompt = create_objective_events_prompt(procedure_context)

contents = [myfile, user_prompt]
print(f"Token count: {client.models.count_tokens(model='models/gemini-2.5-pro', contents=contents)}")

class ObjectiveEventsList(BaseModel):
    """Wrapper for list of objective events"""

    events: List[
        Union[PipetteSettingChange, AspirationEvent, DispensingEvent, TipChangeEvent]
    ] = Field(..., description="List of objective events extracted from video")

objective_response = client.models.generate_content(
    model="models/gemini-2.5-pro",
    contents=contents,
    config={
        "response_mime_type": "application/json",
        "response_schema": ObjectiveEventsList,
        "system_instruction": system_prompt
    },
)

objective_events_result = objective_response.parsed
objective_events = objective_events_result.events

with open(objective_events_json_path, "w") as f:
    f.write(objective_events_result.model_dump_json())

print(f"OBJECTIVE EVENTS EXTRACTION COMPLETE")
print(f"Total events found: {len(objective_events)}")

# Count by type
event_counts = {}
for event in objective_events:
    event_type = type(event).__name__
    event_counts[event_type] = event_counts.get(event_type, 0) + 1

for event_type, count in event_counts.items():
    print(f"- {event_type}: {count}")

STEP 2: OBJECTIVE EVENTS EXTRACTION
Token count: total_tokens=20277 cached_content_token_count=None
OBJECTIVE EVENTS EXTRACTION COMPLETE
Total events found: 19
- PipetteSettingChange: 1
- TipChangeEvent: 6
- AspirationEvent: 12


In [None]:
import os
import json
from video_understanding.multi_prompts import format_objective_events_context
from video_understanding.simple_models import ObjectiveEventsList

objective_events_json_path = "objective_events_result.json"

# Try to load it back in if it exists
if os.path.exists(objective_events_json_path):
    with open(objective_events_json_path, "r") as f:
        loaded_json = f.read()
    objective_events_result = ObjectiveEventsList.model_validate_json(loaded_json)
    print("Loaded ObjectiveEventsList from file:")
    print(f"Total events found: {len(objective_events_result.events)}")
    event_counts = {}
    for event in objective_events_result.events:
        event_type = type(event).__name__
        event_counts[event_type] = event_counts.get(event_type, 0) + 1
    for event_type, count in event_counts.items():
        print(f"- {event_type}: {count}")
else:
    print("ObjectiveEventsList file not found.")

objective_context = format_objective_events_context(objective_events_result.events)

In [None]:
# STEP 3: ANALYSIS EVENTS EXTRACTION
print("=" * 50)
print("STEP 3: ANALYSIS EVENTS EXTRACTION")
print("=" * 50)

system_prompt, user_prompt = create_analysis_events_prompt(procedure_context, objective_context)

contents = [myfile, user_prompt]
print(f"Token count: {client.models.count_tokens(model='models/gemini-2.5-pro', contents=contents)}")

# Define schema for analysis events
from typing import Dict
AnalysisEventsResult = Dict[str, Union[List[WarningEvent], List[WellStateEvent]]]

analysis_response = client.models.generate_content(
    model="models/gemini-2.5-pro",
    contents=contents,
    config={
        "response_mime_type": "application/json",
        "response_schema": AnalysisEventsResult,
        "system_instruction": system_prompt
    },
)

analysis_events = analysis_response.parsed
print("ANALYSIS EVENTS EXTRACTION COMPLETE")

if 'warnings' in analysis_events:
    print(f"Warning events found: {len(analysis_events['warnings'])}")
    for warning in analysis_events['warnings']:
        print(f"  - {warning.warning_message} at {warning.timestamp_range}")

if 'well_states' in analysis_events:
    print(f"Well state events found: {len(analysis_events['well_states'])}")
    for state in analysis_events['well_states']:
        print(f"  - Well {state.well_id}: {'Complete' if state.is_complete else 'Incomplete'} at {state.timestamp_range}")

In [None]:
# SUMMARY: COMPLETE ANALYSIS RESULTS
print("=" * 50)
print("COMPLETE ANALYSIS SUMMARY")
print("=" * 50)

print("\\n1. PROCEDURE OVERVIEW:")
print(f"   - Thinking: {procedure_result.thinking[:100]}...")
print(f"   - Goal wells: {len(procedure_result.goal_wells)}")
print(f"   - Reagent sources: {len(procedure_result.reagent_sources)}")

print("\\n2. OBJECTIVE EVENTS:")
for event_type, count in event_counts.items():
    print(f"   - {event_type}: {count} events")

print("\\n3. ANALYSIS EVENTS:")
warning_count = len(analysis_events.get('warnings', []))
well_state_count = len(analysis_events.get('well_states', []))
print(f"   - Warnings: {warning_count}")
print(f"   - Well state changes: {well_state_count}")

print("\\n4. DETAILED RESULTS AVAILABLE IN:")
print("   - procedure_result: ProcedureExtraction")
print("   - objective_events: List of events")
print("   - analysis_events: Dict with warnings and well states")

print("\\n✅ MULTI-STAGE ANALYSIS COMPLETE")

In [None]:
# Optional: View detailed results
print("DETAILED PROCEDURE RESULT:")
print(procedure_result.model_dump_json(indent=2))

In [None]:
# Optional: View objective events
print("DETAILED OBJECTIVE EVENTS:")
for i, event in enumerate(objective_events):
    print(f"\\nEvent {i+1}: {type(event).__name__}")
    print(event.model_dump_json(indent=2))

In [None]:
# Optional: View analysis events
print("DETAILED ANALYSIS EVENTS:")
print(f"Analysis events type: {type(analysis_events)}")
print("Raw analysis events:")
print(analysis_events)

In [None]:
def create_stateful_prompt(current_goal_state, current_current_state, current_protocol_log, current_warnings):
    """
    Creates a prompt with current state variables that guides the LM to only update what's needed
    Input: 5 frames from ongoing lab procedure + this prompt
    """
    
    prompt = f"""LAB COPILOT - REAL-TIME PROCEDURE INTERPRETATION

You are a lab copilot helping interpret an ongoing laboratory procedure. You're seeing a snapshot of what's happening right now through 5 consecutive frames.

CURRENT STATE (what you know so far):
- Goal State: {current_goal_state if current_goal_state else "Not yet defined"}
- Current State: {current_current_state if current_current_state else "Empty - no reagents added yet"}  
- Protocol Log: {current_protocol_log if current_protocol_log else "No previous events logged"}
- Active Warnings: {current_warnings if current_warnings else "No current warnings"}

YOUR ROLE:
You're in the middle of helping with a task. Analyze these 5 frames to understand what the person is doing right now and update your knowledge accordingly.

Use the triggered flags to indicate what aspects of your understanding need updating:

1. THINKING: Always interpret what you observe in these frames
   - What is the person trying to accomplish?
   - What pipetting actions are they performing?
   - How does this fit with what you've seen before?

2. GOAL STATE: Update if you learn more about what they're ultimately trying to achieve
   - Set goal_state_triggered=True if you gain new insight into the end goal
   - Leave goal_state_triggered=False if their objective is still unclear

3. CURRENT STATE: Update if their actions changed what's in the wells
   - Set current_state_triggered=True if reagents were added/moved
   - Calculate new volumes based on what you observed
   - Leave current_state_triggered=False if no transfers occurred

4. PROTOCOL LOG: Record significant events that matter for the ongoing task
   - Set protocol_log_triggered=True if something important happened
   - Add brief, useful notes (will be appended to your running memory)
   - Skip routine actions that don't change the situation
   - Leave protocol_log_triggered=False if nothing noteworthy occurred

5. WARNINGS: Alert to problems or issues you notice
   - Set warnings_triggered=True if you spot technique issues or errors
   - Note anything that might affect the procedure's success
   - Leave warnings_triggered=False if everything looks normal

FRAME INTERPRETATION:
- Look across the 5 frames to see the progression of their actions
- Focus on what changed: pipette position, liquid movement, container interactions
- Try to understand the intent behind their actions

IMPORTANT:
- You're helping with an ongoing task - provide relevant, actionable insights
- Only set triggered=True for aspects that actually need updating
- If triggered=False, leave that field as None
- Base everything on what you can directly observe

OUTPUT: Return VideoAnalysis with appropriate triggered flags set."""

    return prompt

print("✅ LAB COPILOT PROMPT READY!")
print("🤖 Role: Real-time procedure interpretation")  
print("📸 Input: 5 frames from ongoing task")
print("🔄 Output: Updated understanding of what's happening")