In [1]:
import os
import sys
import numpy as np
import pandas as pd
import cv2
from PIL import Image
import matplotlib.pyplot as plt
import torch

# Add sam3 to path assuming it is in the parent directory or ../sam3
sys.path.append('..') 
sys.path.append('../sam3')

try:
    from sam3.model_builder import build_sam3_image_model
    from sam3.model.sam3_image_processor import Sam3Processor
    print("SAM3 modules imported successfully.")
except ImportError as e:
    print(f"Error importing SAM3: {e}. Make sure the sam3 folder is correctly located.")

SAM3 modules imported successfully.


In [2]:
# Load the SAM3 Model
# This matches the logic from preprocessing/colab.ipynb
print("Loading SAM3 Model...")
model = build_sam3_image_model(enable_inst_interactivity=True)
processor = Sam3Processor(model)
print("Model loaded.")

Loading SAM3 Model...
Model loaded.


In [3]:
def preprocess_scanpath(parquet_path, dist_thresh=0.05, duration_thresh_ms=100):
    """
    Preprocesses scanpaths:
    1. Combines consecutive points if they are spatially close (dist < dist_thresh).
    2. Filters out points that have too low duration (duration < duration_thresh_ms).
    
    Args:
        parquet_path: Path to parquet file.
        dist_thresh: Normalized distance threshold (0-1) to merge consecutive points.
        duration_thresh_ms: Minimum duration in ms to keep a fixation.
    
    Returns:
        DataFrame with columns ['x', 'y', 'start_time', 'end_time', 'duration']
    """
    df = pd.read_parquet(parquet_path)
    if df.empty:
        return pd.DataFrame()
    
    # Ensure sorted by time
    df = df.sort_values('timestamp_ms')
    
    # Extract arrays
    xs = df['x'].values
    ys = df['y'].values
    ts = df['timestamp_ms'].values
    
    fixations = []
    
    if len(xs) == 0:
        return pd.DataFrame()

    # Initial current group
    curr_x_sum = xs[0]
    curr_y_sum = ys[0]
    curr_count = 1
    curr_start = ts[0]
    # Estimate raw sample duration as avg diff or diff to next
    # For the last point, we'll assume same as prev interval or small default
    
    # Iterate
    for i in range(1, len(xs)):
        # Calculate duration of PREVIOUS point (timestamp difference)
        # Note: simplistic duration calc: T[i] - T[i-1] for the (i-1)th sample
        dt = ts[i] - ts[i-1]
        
        # Calculate instantaneous centroid of current group
        curr_x = curr_x_sum / curr_count
        curr_y = curr_y_sum / curr_count
        
        dist = np.sqrt((xs[i] - curr_x)**2 + (ys[i] - curr_y)**2)
        
        if dist < dist_thresh:
            # Combine
            curr_x_sum += xs[i]
            curr_y_sum += ys[i]
            curr_count += 1
        else:
            # Finish group
            # Group duration is Current Timestamp - Start Timestamp
            # (Strictly speaking, it subsumes the intervals between Start and Current)
            grp_duration = ts[i-1] - curr_start + (ts[i] - ts[i-1]) # Include last sample duration roughly
            
            centroid_x = curr_x_sum / curr_count
            centroid_y = curr_y_sum / curr_count
            
            fixations.append({
                'x': centroid_x,
                'y': centroid_y,
                'start_time': curr_start,
                'end_time': ts[i], # Approx end
                'duration': grp_duration
            })
            
            # Start new group
            curr_x_sum = xs[i]
            curr_y_sum = ys[i]
            curr_count = 1
            curr_start = ts[i]

    # Append last group
    if curr_count > 0:
        # Default duration for last point?
        grp_duration = 33 # Approx 30ms for last frame/sample
        centroid_x = curr_x_sum / curr_count
        centroid_y = curr_y_sum / curr_count
        fixations.append({
            'x': centroid_x,
            'y': centroid_y,
            'start_time': curr_start,
            'end_time': curr_start + grp_duration,
            'duration': grp_duration
        })
        
    fix_df = pd.DataFrame(fixations)
    
    # Filter by duration
    if not fix_df.empty:
        fix_df = fix_df[fix_df['duration'] >= duration_thresh_ms].reset_index(drop=True)
        
    return fix_df

print("Preprocessing function defined.")

Preprocessing function defined.


In [4]:
def get_masks_for_fixations(image_path, fixations, processor):
    """
    Generates a mask for each fixation point using SAM3.
    """
    image_pil = Image.open(image_path).convert("RGB")
    width, height = image_pil.size
    
    # Process image once? SAM3 processor might handle caching, or we call it per prompt.
    # Looking at sam3 examples, usually we can pass points to the prompt.
    
    # Prepare inputs
    # processor expects image. 
    # For simplicity, we process row by row to allow "per point" segmentation selection
    
    all_masks = []
    
    print(f"Generating masks for {len(fixations)} fixations...")

    inference_state = processor.set_image(image_pil)

    for idx, row in fixations.iterrows():
        x_px = int(row['x'] * width)
        y_px = int(row['y'] * height)
        
        # Clip to image bounds
        x_px = max(0, min(width-1, x_px))
        y_px = max(0, min(height-1, y_px))
        
        # Construct prompt
        # SAM3 API: expects points like [[x, y]] and labels like [1]
        input_points = [[[x_px, y_px]]]
        input_labels = [[1]]

        print('Pre-Fit')
                
        masks, scores, logits = model.predict_inst(
            inference_state,
            point_coords=input_points,
            point_labels=input_labels,
            multimask_output=False,
        )
        sorted_ind = np.argsort(scores)[::-1]
        masks = masks[sorted_ind]
        scores = scores[sorted_ind]
        logits = logits[sorted_ind]

        print('Post-Fit')
        
        ## 1. Standardize mask shape (handling 4D, 3D, or 2D inputs)
        if len(masks.shape) == 4:
            masks = masks.squeeze(0)
        if len(masks.shape) == 3 and masks.shape[0] == 1:
            masks = masks.squeeze(0)
        
        # 2. Combine all masks into one single master mask
        # We use np.max to ensure if masks overlap, they stay 'visible'
        if len(masks.shape) == 3:
            combined_mask = np.max(masks, axis=0)
        else:
            combined_mask = masks
        
        # Convert to numpy uint8 0 or 255
        # Masks are typically logits, so > 0 check converts to boolean
        mask_np = (combined_mask > 0).astype(np.uint8) * 255
        
        all_masks.append(mask_np)
        
        if idx % 5 == 0:
            print(f"Processed {idx}/{len(fixations)}")
            
    return all_masks

In [5]:
from PIL import Image, ImageDraw
import cv2
import numpy as np

def generate_scanpath_video(image_path, fixations, masks, output_video_path, fps=30):
    # Use PIL for image manipulation as requested
    base_pil = Image.open(image_path).convert("RGBA")
    width, height = base_pil.size
    
    # Create the background with the requested dark overlay (0,0,0,200)
    overlay = Image.new("RGBA", base_pil.size, (0, 0, 0, 200))
    dimmed_bg_pil = Image.alpha_composite(base_pil, overlay)
    
    if fixations.empty:
        print("No fixations to animate.")
        return

    min_time = fixations['start_time'].min()
    max_time = fixations['end_time'].max()
    duration_ms = max_time - min_time
    total_frames = int((duration_ms / 1000.0) * fps)
    
    # VideoWriter expects BGR, create it
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    print(f"Generating video: {output_video_path}")
    print(f"Duration: {duration_ms}ms, Frames: {total_frames}")
    
    # Dot radius (dynamic based on size, ~1.5% of max dimension)
    radius = int(max(width, height) * 0.01)
    
    for f in range(total_frames):
        current_time_ms = min_time + (f / fps) * 1000.0
        
        # Find active fixation
        active_fix = fixations[
            (fixations['start_time'] <= current_time_ms) & 
            (fixations['end_time'] >= current_time_ms)
        ]
        
        # Start matching active mask on dimmed
        frame_pil = dimmed_bg_pil.copy()
        
        if not active_fix.empty:
            idx = active_fix.index[0]
            if idx < len(masks):
                mask_np = masks[idx]
                
                # Resize if needed
                if mask_np.shape[:2] != (height, width):
                    mask_np = cv2.resize(mask_np, (width, height), interpolation=cv2.INTER_NEAREST)
                
                # Convert mask to PIL (L mode) for compositing
                mask_pil = Image.fromarray(mask_np).convert("L")
                
                # Composite: Show base_pil (bright) where mask is white, dimmed_bg elsewhere
                frame_pil = Image.composite(base_pil, frame_pil, mask_pil)
        
        # Overlay drawing layer (for dots and lines with proper alpha blending)
        shapes_layer = Image.new("RGBA", (width, height), (0, 0, 0, 0))
        draw = ImageDraw.Draw(shapes_layer)

        # Collect points for the path (all fixations started up to now)
        # Includes finished grey ones and potentially the active red one.
        # "Connect all grey and the red dot"
        
        # 1. Previous fixations (Grey)
        prev_fixs = fixations[fixations['end_time'] < current_time_ms]
        for _, row in prev_fixs.iterrows():
            px, py = row['x'] * width, row['y'] * height
            draw.ellipse((px - radius, py - radius, px + radius, py + radius), fill=(128, 128, 128, 100))
        
        # 2. Current fixation (Red)
        if not active_fix.empty:
            row = active_fix.iloc[0]
            px, py = row['x'] * width, row['y'] * height
            draw.ellipse((px - radius, py - radius, px + radius, py + radius), fill=(255, 0, 0, 200))
            
        # Composite shapes onto frame
        frame_pil = Image.alpha_composite(frame_pil, shapes_layer)
        
        # Convert PIL RGBA -> RGB -> BGR for OpenCV
        frame_rgb = frame_pil.convert("RGB")
        frame_bgr = np.array(frame_rgb)[:, :, ::-1]
        
        out.write(frame_bgr)
        
        if f % 30 == 0:
            print(f"Frame {f}/{total_frames}", end='\r')

    out.release()
    print("\nVideo saved.")

In [6]:
# --- CONFIGURATION ---
# Example selection. Change these to select different user/image.
PARQUET_FOLDER = r'../parquet'
IMAGE_FOLDER = r'../img_bin'
OUTPUT_FOLDER = r'./output_videos'

if not os.path.exists(OUTPUT_FOLDER):
    os.makedirs(OUTPUT_FOLDER)

# ID Helper
# Parquet format: P22_IMG036_00100.parquet
# Image format: IMG036_00100.jpg
# Let's pick P22 and IMG036 for demo
TARGET_FILE_NAME = "P01_IMG004_10100.parquet"
TARGET_IMG_NAME = "IMG004_10100.jpg"

parquet_path = os.path.join(PARQUET_FOLDER, TARGET_FILE_NAME)
image_path = os.path.join(IMAGE_FOLDER, TARGET_IMG_NAME)

print(f"Processing: {parquet_path}")

# 1. Preprocess
# dist_thresh=0.05 (5% of screen diagonal approx?), duration_thresh=100ms
fixations_df = preprocess_scanpath(parquet_path, dist_thresh=0.05, duration_thresh_ms=80) 



Processing: ../parquet\P01_IMG004_10100.parquet


In [7]:
print(f"Found {len(fixations_df)} fixations after preprocessing.")
# print(fixations_df.head())

if not fixations_df.empty:
    # 2. Extract Masks
    masks = get_masks_for_fixations(image_path, fixations_df, processor)
    
    # 3. Generate Video
    video_name = TARGET_FILE_NAME.replace('.parquet', '.mp4')
    output_path = os.path.join(OUTPUT_FOLDER, video_name)
    
    generate_scanpath_video(image_path, fixations_df, masks, output_path, fps=30)
else:
    print("Skipping video generation due to lack of valid fixations.")

Found 15 fixations after preprocessing.
Generating masks for 15 fixations...
Pre-Fit
Post-Fit
Processed 0/15
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Processed 5/15
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Processed 10/15
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Pre-Fit
Post-Fit
Generating video: ./output_videos\P01_IMG004_10100.mp4
Duration: 5500.0ms, Frames: 165
Frame 150/165
Video saved.


In [None]:
from google import genai
import time

# Initialize the client
# (It automatically looks for the GEMINI_API_KEY environment variable)
client = genai.Client(api_key="Your_Google_API_Key_Here")

# 1. Upload the video
# This is required for videos larger than 20MB or longer than 1 minute
print("Uploading video...")
video_file = client.files.upload(file=r"C:\Users\domin\OneDrive\Seminar\code\video_generation\output_videos\P01_IMG002_10100.mp4")

# 2. Wait for processing
# Video files must be in 'ACTIVE' state before they can be used in a prompt
while video_file.state.name == "PROCESSING":
    print("Processing...", end="\r")
    time.sleep(2)
    video_file = client.files.get(name=video_file.name)

print("\nVideo is ready!")

# 3. Prompt with Text + Video
response = client.models.generate_content(
    model="gemini-2.5-pro", 
    contents=[
        video_file,
        """You are tasked with generating a short inner monologue (approx 15 seconds when read) based on the following structured inputs:
-Video description: A social media image, humorous or satirical in tone, displayed with an animated visual path based on eye-tracking data.
-Eye-tracking sequence: We have a series of visual items (extracted via Facebook’s SAM segmentation), each corresponding to where the viewer looked, in chronological order.

Instructions for generating the inner monologue:
-Imagine you’re the viewer whose gaze follows that exact sequence.
-The monologue should reflect a quick, almost instantaneous train of thought.
-Connect the items naturally, as if noticing one thing after another, with reactions.
-Be concise — the whole monologue should last about 15 seconds if spoken aloud.
-No need to describe the eye movement itself; just the inner thoughts triggered by what’s seen.

Example input format you will receive:
-Items in gaze order: [cat with sunglasses, text: “I hate Mondays”, empty coffee mug, explosion in background]
-Tone: humor
-Example output monologue:
“A cat with shades, cool. ‘I hate Mondays’ — yeah, me too. Wait, why’s the mug empty? Oh. Explains the explosion.”

Now generate for the video. Only respond with the monologue text, no extra commentary."""
    ]
)

print("-" * 30)
print(response.text)

# 4. Cleanup (Optional)
# Files are auto-deleted after 48 hours, but you can delete manually
resp = client.files.delete(name=video_file.name)

Uploading video...
Processing...
Video is ready!
------------------------------
Is that Trump's face? And there's Melania... Wait, back to him. Why is his head a building? Oh my god. That's one of the Twin Towers, and he's holding airplanes. That is... rough. And who's this other guy?
