#  Vertex Inference Unified

This notebook uses the unified `google-genai` library (imported as `from google import genai`). It supports:
- **Vertex AI Backend:** Uploads videos to GCS during the 'Prepare' step. Using your own Cloud Account
- **Gemini API Backend:** Uploads videos using the **File API** during the 'Prepare' step. Using API key

**Pipeline Steps:**
1.  **Import Libraries & Configure.**
2.  **Config** - Set up the configuration for the pipeline, including the model, model config, prompts, and prompt config.
2.  **Initialize Clients:** Set up AI client and Storage client.
3.  **(Only the first time) Fetch Dataset:** Downloads metadata from HuggingFace.
4.  **(Only the first time on each API type) Download, Extract & Prepare Videos:** Downloads, extracts, uploads (GCS/File API). Updates metadata.
5.  **Bulk Inference (Async):** Performs inference using pre-uploaded video resources using baseline models (Gemini 2.5 Pro/ Gemini 2.5 Flash)

### Notes:

1. After switching from Vertex to Gemini and vice versa, be sure to follow the steps:
    - Run all cells in order to re-upload the videos to the correct storage client, you can enable the SKIP_DOWNLOAD and SKIP_EXTRACT flags to skip the download and extraction steps. Only the upload step is needed

2. Gemini API's file client has a expiry time of 1 day or so for the uploaded files. You may need to follow the steps above to re-upload the files.

## Import Libraries

In [40]:
# Cell 1: Imports (Corrected for `google.genai`)
import os
import csv
import json
import logging
import time
import random
import requests
import datetime
import zipfile
import math
import sys
import asyncio
from typing import Dict, List, Optional, Set, Tuple, Any
from collections import defaultdict
from pathlib import Path
import shutil
import subprocess
import tempfile
import fractions

# Google Cloud & AI Libraries (Unified SDK)
try:
    import google.genai as genai
    from google.genai import types
    from google.genai import errors as genai_errors
    from google.api_core import exceptions as api_core_exceptions
    # GCS Client (Optional, for Vertex Mode)
    try:
        from google.cloud import storage
        GCS_AVAILABLE = True
    except ImportError:
        print("INFO: google-cloud-storage not found. Vertex AI GCS operations unavailable.")
        storage = None
        GCS_AVAILABLE = False
    print("`google.genai` SDK and helpers imported successfully.")
except ImportError as e:
     print(f"ERROR: Failed to import Google libraries: {e}. Install: pip install google-genai google-api-core google-cloud-storage")
     genai = None; types = None; genai_errors = None; api_core_exceptions = None
     storage = None; GCS_AVAILABLE = False
     raise ImportError("FATAL: `google.genai` or `google-api-core` SDK not found.")

# Data Handling & Progress
from datasets import load_dataset
import pandas as pd
from tqdm.notebook import tqdm

# UI Elements
import ipywidgets as widgets
from IPython.display import display, Markdown, HTML, clear_output

# Async in Notebook
import nest_asyncio
nest_asyncio.apply()

`google.genai` SDK and helpers imported successfully.


## Config Settings

In [41]:
# --- GCP Configuration ---

# PROJECT_ID = "YOUR_PROJECT_ID_HERE" # Your Google Cloud Project ID (Needed for GCS and Vertex AI mode)
# LOCATION = "us-central1"      # Your Google Cloud Region (Needed for Vertex AI mode)
# GCS_BUCKET = "YOUR_GCS_BUCKET_HERE" # Your GCS bucket name (Needed for video storage)

PROJECT_ID = "tiktokllm" # Your Google Cloud Project ID (Needed for GCS and Vertex AI mode)
LOCATION = "us-central1"      # Your Google Cloud Region (Needed for Vertex AI mode)
GCS_BUCKET = "seekdeepr4-ml-storage" # Your GCS bucket name (Needed for video storage)

# --- Choose Backend Mode ---
# Set USE_VERTEX to True to use the Vertex AI backend (requires ADC or service account auth).
# Set USE_VERTEX to False to use the Gemini API backend (requires GEMINI_API_KEY).
USE_VERTEX = False  # <-- CHANGE THIS TO True TO USE VERTEX AI

# --- Gemini API Key (Only required if USE_VERTEX is False) ---
# IMPORTANT: Replace with your actual Gemini API Key if USE_VERTEX is False.
# Consider loading from environment variables (GOOGLE_API_KEY) or a secure secrets manager.
GEMINI_API_KEY = "AIzaSyCtTXQV55XJqZRQb8hXUsFFfI3dJHK_5nE"  # Replace with your actual Gemini API Key


# --- File Paths ---
DATASET_CSV = "dataset.csv"               # Input dataset metadata from HuggingFace
METADATA_FILE = "video_metadata_vertex.csv" if USE_VERTEX else "video_metadata_non_vertex.csv"      # Stores video info: video_id, local_path, gcs_uri (if Vertex), question data
RESULTS_FILE = "results_noncot_full_inference.csv"              # Output file for inference predictions
DOWNLOADS_DIR = "downloads"               # Directory for downloaded zip file
EXTRACTED_VIDEOS_DIR = "extracted_videos" # Directory storing extracted .mp4 files locally
SPEED_VIDEOS_DIR = "speed_videos"         # Stores sped up/slowed down videos
HF_CACHE_DIR = "./hf_cache"               # Cache directory for HuggingFace datasets

# --- Step 1: Fetch Dataset Configuration ---
HF_DATASET_NAME = "lmms-lab/AISG_Challenge" # HuggingFace dataset identifier
HF_DATASET_SPLIT = "test"                 # Dataset split to use
SKIP_FETCH = False                        # Set True to skip fetching if DATASET_CSV exists

# --- Step 2: Download & Prepare Videos Configuration ---
VIDEO_ZIP_URL = "https://huggingface.co/datasets/lmms-lab/AISG_Challenge/resolve/main/Benchmark-AllVideos-HQ-Encoded-challenge.zip?download=true"
ZIP_FILE_NAME = "all_videos.zip"
SKIP_DOWNLOAD_ZIP = True                 # Set True to skip downloading if zip exists
SKIP_EXTRACT = True                      # Set True to skip extraction if videos exist locally
SKIP_PREPARE = True                     # Set True to skip video preparation (GCS upload for Vertex, metadata update)
MAX_VIDEOS_TO_PROCESS = None              # Limit videos for testing (e.g., 5), None for all
UPLOAD_BATCH_SIZE_GCS = 10                # Batch size for GCS uploads (Vertex mode only)

# --- Inference Configuration ---
# Choose a model name compatible with your selected method (Vertex AI or Gemini API)

# Examples:

# Vertex AI: gemini-2.0-flash, gemini-2.0-flash-lite, gemini-2.0-pro-exp-02-05, gemini-2.0-flash-thinking-exp-01-21
# Rate limits: https://cloud.google.com/vertex-ai/generative-ai/docs/quotas#gemini-2.0-flash
# Basically 500 requests per minute for 2.0-flash and 2.0-flash-lite (unlimited), 10 requests per minute for 2.0-pro-exp-02-05, gemini-2.5-flash-preview-04-17

# Gemini API: gemini-2.0-flash, gemini-2.0-flash-lite, gemini-2.0-flash-thinking-exp-01-21, gemini-2.5-pro-exp-03-25
# Rate limits: https://ai.google.dev/gemini-api/docs/rate-limits#tier-1
# For free tier: 30 requests per minute for 2.0-flash and 2.0-flash-lite, 10 requests per minute for 2.0-pro-exp-02-05
# For tier-1: 2000 requests per minute for 2.0-flash and 2.0-flash-lite (have to pay), 10 requests per minute for 2.0-pro-exp-02-05 and gemini-2.0-flash-thinking-exp-01-21, gemini-2.5-flash-preview-04-17

# 1.0=normal speed, 0.5=half speed, etc.
VIDEO_SPEED_FACTOR = 0.5

# --- Setup Derived Paths & Directories ---
zip_file_path = Path(DOWNLOADS_DIR) / ZIP_FILE_NAME
extracted_videos_path = Path(EXTRACTED_VIDEOS_DIR)
speed_videos_path = Path(SPEED_VIDEOS_DIR) / str(VIDEO_SPEED_FACTOR)
Path(DOWNLOADS_DIR).mkdir(parents=True, exist_ok=True)
extracted_videos_path.mkdir(parents=True, exist_ok=True)
speed_videos_path.mkdir(parents=True, exist_ok=True)
Path(HF_CACHE_DIR).mkdir(parents=True, exist_ok=True)

# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)

# --- Configuration Validation & Display --- #
warnings_found = False
if USE_VERTEX:
    if not PROJECT_ID or PROJECT_ID == "your-gcp-project-id":
        logger.error("Vertex AI mode requires PROJECT_ID to be set.")
        warnings_found = True
    if not LOCATION:
        logger.error("Vertex AI mode requires LOCATION to be set.")
        warnings_found = True
    if not GCS_BUCKET or GCS_BUCKET == "your-gcs-bucket-name":
        logger.error("Vertex AI mode requires GCS_BUCKET for video uploads.")
        warnings_found = True
    if not GCS_AVAILABLE:
        logger.error("Vertex AI mode requires 'google-cloud-storage', but it's not installed.")
        warnings_found = True
else: # Gemini API Mode
    # Check API Key (explicit or env var)
    effective_api_key = GEMINI_API_KEY if GEMINI_API_KEY != "YOUR_API_KEY_HERE" else os.environ.get("GOOGLE_API_KEY")
    if not effective_api_key:
        logger.error("Gemini API mode requires GEMINI_API_KEY or GOOGLE_API_KEY environment variable.")
        warnings_found = True
    else:
        # Don't store the key in the config display if loaded from env
        if GEMINI_API_KEY == "YOUR_API_KEY_HERE" and os.environ.get("GOOGLE_API_KEY"):
            GEMINI_API_KEY = "(Loaded from GOOGLE_API_KEY env var)"
        logger.info("Gemini API mode configured. Videos will be uploaded via File API.")

if warnings_found:
     print("\n\n************************* WARNING *************************")
     print("Configuration errors detected above. Execution might fail.")
     print("***********************************************************\n")

2025-05-09 21:28:47,460 - INFO - Gemini API mode configured. Videos will be uploaded via File API.


# Main Model Selection For Bulk Inference Generated Questions

### Generating Questions Models

In [42]:
# To See All Available Models, Go into the Generating_Questions_models.py file
# Models are used to generate the questions
from models.Generating_Questions_models import get_brainstorm_prompt
CoT_model_list = ["gemini-2.0-flash","gemini-2.5-pro-preview-05-06"]
QUESTIONS_MODEL_NAME, QUESTIONS_PROMPT, QUESTIONS_SCHEMA, QUESTIONS_CONFIG, QUESTIONS_REQUESTS_PER_MINUTE, QUESTIONS_MAX_RETRIES, QUESTIONS_MAX_ASYNC_WORKERS = get_brainstorm_prompt(CoT_model_list[0])

### COT output models for Bulk Inferences

In [43]:
# To See All Available Models, Go into the CoT_ouput_models.py file
# CoT models are used for Bulk Inference
from models.CoT_ouput_models import get_cot_model
CoT_model_list = ["gemini-2.0-flash","gemini-2.5-pro-preview-05-06", "gemini-2.5-pro-preview-03-25"]
MODEL_NAME, SYSTEM_PROMPT, PROMPT_TEMPLATES, CONFIG, REQUESTS_PER_MINUTE, MAX_RETRIES, MAX_ASYNC_WORKERS = get_cot_model(CoT_model_list[2])

# Basic Initialization

## Initialize Google Cloud Clients

In [44]:
storage_client = None
ai_client = None

# --- Initialize Generative AI Client (`google.genai`) --- #
display(Markdown("### Initializing Generative AI Client (`google.genai`)"))
try:
    if USE_VERTEX:
        display(Markdown(f"Vertex AI backend (Project: {PROJECT_ID}, Loc: {LOCATION})..."))
        if not PROJECT_ID or not LOCATION or PROJECT_ID == "your-gcp-project-id":
             raise ValueError("PROJECT_ID/LOCATION invalid for Vertex AI.")
        # Initialize Client for Vertex
        ai_client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)
        display(Markdown(f"✅ Vertex AI Client Initialized."))
    else: # Gemini API Mode
        display(Markdown("Gemini API backend (using API Key)..."))
        effective_api_key = GEMINI_API_KEY if GEMINI_API_KEY != "YOUR_API_KEY_HERE" else os.environ.get("GOOGLE_API_KEY")
        if not effective_api_key:
             if os.environ.get("GOOGLE_API_KEY"): effective_api_key = None # Client uses env var
             else: raise ValueError("Gemini API Key required but not found.")
        # Initialize Client for Gemini API
        ai_client = genai.Client(api_key=effective_api_key, vertexai=False)
        display(Markdown(f"✅ Gemini API Client Initialized."))

except ValueError as ve: display(Markdown(f"❌ **Config Error:** {ve}")); ai_client = None
except Exception as e: display(Markdown(f"❌ **AI Client Error:** {e}.")); logger.error("AI Client Init Failed", exc_info=True); ai_client = None

# --- Initialize Storage Client (ONLY for Vertex AI mode) --- #
if USE_VERTEX:
    display(Markdown("### Initializing GCS Client (Vertex Mode Only)"))
    if not GCS_AVAILABLE: display(Markdown("❌ GCS lib missing.")); raise RuntimeError("Missing GCS lib.")
    if not GCS_BUCKET or GCS_BUCKET == "your-gcs-bucket-name": display(Markdown("❌ GCS_BUCKET needed.")); raise ValueError("GCS_BUCKET required.")
    try:
        storage_client = storage.Client(project=PROJECT_ID)
        if not storage_client.bucket(GCS_BUCKET).exists(): display(Markdown(f"⚠️ GCS Bucket `{GCS_BUCKET}` inaccessible."))
        else: display(Markdown(f"✅ GCS Client Initialized (Bucket: '{GCS_BUCKET}')."))
    except Exception as e:
        display(Markdown(f"❌ **GCS Client Error:** {e}.")); logger.error("GCS Client Init Failed", exc_info=True)
        if not SKIP_PREPARE: raise RuntimeError("GCS client failed.")
        else: display(Markdown("⚠️ GCS client failed, but skipping prep."))
else:
    display(Markdown("### Initializing Gemini API Client (File API)"))
    try:
        storage_client = ai_client.files
    except Exception as e:
        display(Markdown(f"❌ **Gemini File API Client Error:** {e}.")); logger.error("Gemini API Client Init Failed", exc_info=True)
    display(Markdown(f"✅ Gemini File API Client Initialized."))

# --- Final Checks --- #
if ai_client is None: raise RuntimeError("AI client failed.")

if USE_VERTEX and storage_client is None and not SKIP_PREPARE: raise RuntimeError("GCS client failed for Vertex prep.")
display(Markdown("✅ Client initialization complete."))


### Initializing Generative AI Client (`google.genai`)

Gemini API backend (using API Key)...

✅ Gemini API Client Initialized.

### Initializing Gemini API Client (File API)

✅ Gemini File API Client Initialized.

✅ Client initialization complete.

## Utility Functions

In [45]:
# --- File/Data Handling ---
def load_processed_video_ids(filename: str) -> Set[str]:
    processed_video_ids = set()
    if Path(filename).is_file():
        try:
            df = pd.read_csv(filename, usecols=['video_id'], dtype={'video_id': str}, on_bad_lines='warn')
            processed_video_ids = set(df['video_id'].dropna().unique())
            logger.info(f"Loaded {len(processed_video_ids)} processed video IDs from {filename}")
        except Exception as e:
            logger.warning(f"Could not read video IDs from {filename}: {e}. Assuming zero processed.")
    return processed_video_ids


def download_file_with_progress(url: str, destination: Path):
    logger.info(f"Downloading {url} to {destination}...")
    try:
        response = requests.get(url, stream=True, timeout=600)
        response.raise_for_status()
        total_size = int(response.headers.get('content-length', 0))
        block_size = 1024 * 1024
        with open(destination, 'wb') as f, tqdm(
            desc=f"Downloading {destination.name}", total=total_size, unit='iB', unit_scale=True, unit_divisor=1024
        ) as bar:
            for data in response.iter_content(block_size):
                size = f.write(data)
                bar.update(size)
        if total_size != 0 and bar.n != total_size:
            destination.unlink(missing_ok=True)
            raise RuntimeError(f"Download size mismatch for {destination.name}.")
        logger.info(f"Successfully downloaded {destination}")
    except Exception as e:
        destination.unlink(missing_ok=True)
        logger.error(f"Download failed for {url}: {e}")
        raise

def extract_zip(zip_path: Path, extract_to: Path):
    logger.info(f"Extracting {zip_path.name} to {extract_to}...")
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            members = [m for m in zip_ref.namelist() if not m.startswith('__MACOSX/') and not m.endswith('.DS_Store')]
            with tqdm(total=len(members), desc=f"Extracting {zip_path.name}") as pbar:
                for member in members:
                    zip_ref.extract(member=member, path=extract_to)
                    pbar.update(1)
        logger.info(f"Successfully extracted {zip_path} to {extract_to}")
    except Exception as e:
        logger.error(f"Extraction error: {e}")
        raise
    
def move_videos_to_main_directory(base_path):
    """Find all MP4 files in subdirectories and move them to the main directory."""
    logger.info(f"Moving all videos to main directory: {base_path}")
    moved_count = 0
    failed_count = 0
    
    # Find all MP4 files in subdirectories (but not in the main directory)
    for file_path in list(base_path.glob('**/*.mp4')):
        # Skip files already in the main directory or hidden Mac files
        if file_path.parent == base_path or file_path.name.startswith('._'):
            continue
            
        # Destination in the main directory
        dest_path = base_path / file_path.name
        
        try:
            # Move the file
            shutil.move(str(file_path), str(dest_path))
            moved_count += 1
            if moved_count % 50 == 0:
                logger.info(f"Moved {moved_count} videos so far...")
        except Exception as e:
            logger.error(f"Error moving {file_path}: {e}")
            failed_count += 1
    
    logger.info(f"Moved {moved_count} videos to main directory. Failed: {failed_count}")
    

def create_or_update_metadata(metadata_path: str, dataset_df: pd.DataFrame, video_updates: Dict[str, Dict]):
    try:
        required_cols = ['video_id', 'qid']
        update_cols = ['local_path', 'gcs_uri', 'file_api_name', 'status']
        dtype_map = {'video_id': str, 'qid': str} # Ensure IDs are strings

        if not Path(metadata_path).is_file():
            logger.info(f"Creating metadata file: {metadata_path}")
            meta_df = dataset_df.copy()
            for col in update_cols: meta_df[col] = pd.NA
            meta_df['status'] = 'pending'
        else:
            logger.debug(f"Loading existing metadata: {metadata_path}")
            meta_df = pd.read_csv(metadata_path, dtype=dtype_map)
            for col in update_cols: # Add missing update columns if needed
                 if col not in meta_df.columns: meta_df[col] = pd.NA

        if not all(col in meta_df.columns for col in required_cols):
            raise ValueError(f"Metadata missing required columns ({required_cols}).")

        updates_df = pd.DataFrame.from_dict(video_updates, orient='index')
        updates_df.index.name = 'video_id'
        updates_df.reset_index(inplace=True)
        updates_df['video_id'] = updates_df['video_id'].astype(str)

        # Use merge for robust updating across potentially multiple rows per video_id
        # First, prepare updates DF with only the necessary columns (video_id + update_cols)
        merge_cols = ['video_id'] + [col for col in update_cols if col in updates_df.columns]
        updates_to_merge = updates_df[merge_cols].drop_duplicates(subset=['video_id'], keep='last')

        # Merge, prioritizing updates
        # Suffixes help identify original vs update cols if needed, but update will overwrite
        merged_df = pd.merge(meta_df, updates_to_merge, on='video_id', how='left', suffixes=('', '_update'))

        # Apply the updates
        for col in update_cols:
            update_col_name = col + '_update'
            if update_col_name in merged_df.columns:
                # Fill NAs in original col with update col, then drop update col
                meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])
                # Alternative: Directly update where update is not NA
                # meta_df[col] = np.where(merged_df[update_col_name].notna(), merged_df[update_col_name], merged_df[col])

        meta_df.to_csv(metadata_path, index=False, encoding='utf-8')
        logger.info(f"Metadata file '{metadata_path}' updated with {len(video_updates)} video records.")

    except Exception as e:
        logger.error(f"Error updating metadata {metadata_path}: {e}", exc_info=True)
        raise

def load_metadata_questions_generation(metadata_file: str = METADATA_FILE) -> Dict[str, Dict]:
    """Loads video metadata where each video_id is unique. Returns Dict[video_id, metadata_dict]."""
    if not Path(metadata_file).is_file():
        return {}

    required_col = 'gcs_uri' if USE_VERTEX else 'file_api_name'
    try:
        df = pd.read_csv(metadata_file, dtype=str).fillna('')
        if 'video_id' not in df.columns or required_col not in df.columns:
            logger.error(f"Metadata missing 'video_id' or '{required_col}'.")
            return {}
        # Keep only valid rows
        valid_df = df[df['video_id'].astype(bool) & df[required_col].astype(bool)]
        if len(valid_df) == 0:
            logger.warning(f"No videos found with '{required_col}' in {metadata_file}. Check Step 4.")
            return {}
        # Drop duplicates to ensure uniqueness
        unique_df = valid_df.drop_duplicates(subset='video_id', keep='first')
        # Create the final dictionary
        video_metadata = {
            row['video_id']: row.to_dict()
            for _, row in unique_df.iterrows()
        }
        logger.info(f"Loaded {len(video_metadata)} unique videos for inference.")
        return video_metadata
    except Exception as e:
        logger.error(f"Error loading metadata for inference: {e}", exc_info=True)
        return {}


# --- Upload/Verification Helpers ---
def upload_to_gcs(storage_client, bucket_name: str, source_file_path: Path, destination_blob_name: str) -> Optional[str]:
    if not GCS_AVAILABLE or storage_client is None or not source_file_path.is_file(): return None
    try:
        blob = storage_client.bucket(bucket_name).blob(destination_blob_name)
        blob.upload_from_filename(str(source_file_path))
        gcs_uri = f"gs://{bucket_name}/{destination_blob_name}"
        logger.debug(f"GCS OK: {source_file_path} -> {gcs_uri}")
        return gcs_uri
    except Exception as e:
        logger.error(f"GCS Fail: {source_file_path}. Error: {e}")
        return None

def upload_via_file_api(storage_client, local_path: Path, display_name: str) -> Optional[str]:
    if storage_client is None or not local_path.is_file(): return None
    try:
        logger.debug(f"Uploading {local_path} via File API...")
        uploaded_file = storage_client.upload(file=local_path)
        logger.info(f"File API OK: {local_path} -> {uploaded_file.name}")
        return uploaded_file.name
    except Exception as e:
        logger.error(f"File API Fail: {local_path}. Error: {e}", exc_info=True)
        return None

def verify_gcs_file_exists(storage_client, gcs_uri: str) -> bool:
    if not GCS_AVAILABLE or storage_client is None or not gcs_uri: return False
    try:
        exists = storage.Blob.from_string(gcs_uri, client=storage_client).exists()
        if not exists: logger.warning(f"GCS verify failed: {gcs_uri}")
        return exists
    except Exception as e:
        logger.error(f"Error verifying GCS {gcs_uri}: {e}")
        return False

def verify_file_api_resource_exists(storage_client, file_api_name: str) -> bool:
    if not storage_client or not file_api_name: return False
    try:
        _ = storage_client.get(name=file_api_name) # Sync get for verification
        return True
    except Exception as e:
        logger.error(f"Error verifying File API {file_api_name}: {e}")
        return False

def verify_local_file_exists(local_path: str) -> bool:
    exists = Path(local_path).is_file() if local_path else False
    if not exists: logger.warning(f"Local verify failed: {local_path}")
    return exists

# --- Prompt Building ---
def build_prompt(question_info: dict) -> str:
    question = question_info.get("question", "")
    q_type = question_info.get("question_type", "default")
    template = PROMPT_TEMPLATES.get(q_type, PROMPT_TEMPLATES["default"])
    # if q_type is MCQ
    if q_type == "Multiple-choice Question with a Single Correct Answer":
        return template.format(question=question).strip() + "\n" + "E. None of the above"
    return template.format(question=question).strip() + "\n" + question_info.get("question_prompt").strip()

# --- Rate Limiter ---
class AsyncRateLimiter:
    """
    An asyncio-compatible token bucket rate limiter.

    Args:
        rate (int): The maximum number of requests allowed per period.
        period (float): The time period in seconds (default: 60 for RPM).
        capacity (int, optional): The maximum burst capacity. Defaults to `rate`.
    """
    def __init__(self, rate: int, period: float = 60.0, capacity: Optional[int] = None):
        if rate <= 0:
            raise ValueError("Rate must be positive")
        if period <= 0:
            raise ValueError("Period must be positive")

        self.rate = rate
        self.period = float(period)
        self.capacity = float(capacity if capacity is not None else rate)
        self._tokens = self.capacity # Start full
        self._last_refill_time = time.monotonic()
        self._lock = asyncio.Lock()

    def _get_tokens_per_second(self) -> float:
        return self.rate / self.period

    async def _refill(self):
        """Replenishes tokens based on elapsed time. Must be called under lock."""
        now = time.monotonic()
        elapsed = now - self._last_refill_time
        if elapsed > 0:
            tokens_to_add = elapsed * self._get_tokens_per_second()
            self._tokens = min(self.capacity, self._tokens + tokens_to_add)
            self._last_refill_time = now

    async def acquire(self):
        """
        Acquires a token, waiting if necessary.
        """
        async with self._lock:
            await self._refill() # Refill based on time since last acquire/refill

            while self._tokens < 1:
                # Calculate how long to wait for 1 token
                tokens_needed = 1.0 - self._tokens
                wait_time = tokens_needed / self._get_tokens_per_second()

                # Release the lock before sleeping
                lock_released = True
                try:
                    self._lock.release()
                    logger.debug(f"Rate limit hit. Waiting for {wait_time:.3f}s for next token.")
                    await asyncio.sleep(wait_time)
                finally:
                    # Re-acquire the lock if it was released
                    if lock_released:
                        await self._lock.acquire()

                # Refill again after waiting, as more time has passed
                await self._refill()

            # Consume a token
            self._tokens -= 1.0


# Download, Extract & Prepare Dataset

## Fetch Dataset from HuggingFace

In [10]:
dataset_path = Path(DATASET_CSV)

if dataset_path.is_file() and SKIP_FETCH:
    logger.info(f"Dataset file '{DATASET_CSV}' exists and SKIP_FETCH is True. Skipping.")
    display(Markdown(f"✅ Skipping fetch: Found existing `{DATASET_CSV}`."))
    # Load the existing dataframe for use in Step 2
    try:
        dataset_df = pd.read_csv(dataset_path, dtype=str) # Load all as string initially
        logger.info(f"Loaded existing dataset from {DATASET_CSV} ({len(dataset_df)} rows).")
    except Exception as e:
        logger.error(f"Failed to load existing dataset file {DATASET_CSV}: {e}")
        display(Markdown(f"❌ Error loading existing `{DATASET_CSV}`: {e}. Please delete the file or set SKIP_FETCH=False."))
        raise
else:
    logger.info(f"Fetching dataset '{HF_DATASET_NAME}' (split: '{HF_DATASET_SPLIT}') from HuggingFace...")
    try:
        dataset = load_dataset(HF_DATASET_NAME, split=HF_DATASET_SPLIT, cache_dir=HF_CACHE_DIR)
        dataset_df = dataset.to_pandas()
        # Ensure key columns are strings
        for col in ['qid', 'video_id', 'question', 'question_type']:
             if col in dataset_df.columns:
                 dataset_df[col] = dataset_df[col].astype(str)
        dataset_df.to_csv(dataset_path, index=False, encoding='utf-8')
        logger.info(f"Successfully fetched dataset and saved to {DATASET_CSV} ({len(dataset_df)} rows).")
        display(Markdown(f"✅ Dataset fetched and saved to `{DATASET_CSV}` ({len(dataset_df)} rows)."))
        display(dataset_df.head())
    except Exception as e:
        logger.error(f"Failed to fetch or save dataset: {e}", exc_info=True)
        display(Markdown(f"❌ **Error fetching dataset:** {e}. Check connection, dataset name/split, cache dir permissions."))
        raise RuntimeError("Dataset fetching failed. Cannot continue.")

# Ensure dataset_df is loaded if skipping fetch didn't load it (e.g., first run with skip=True and no file)
if 'dataset_df' not in locals():
    if dataset_path.is_file():
        try:
            dataset_df = pd.read_csv(dataset_path, dtype=str)
        except Exception as e:
            logger.error(f"Critical error: Could not load dataset from {DATASET_CSV} after attempting fetch/skip: {e}")
            raise
    else:
        raise RuntimeError(f"Critical error: Dataset DataFrame not loaded and file {DATASET_CSV} not found.")

2025-05-09 21:18:13,098 - INFO - Fetching dataset 'lmms-lab/AISG_Challenge' (split: 'test') from HuggingFace...
2025-05-09 21:18:17,802 - INFO - Successfully fetched dataset and saved to dataset.csv (1500 rows).


✅ Dataset fetched and saved to `dataset.csv` (1500 rows).

Unnamed: 0,qid,video_id,question_type,capability,question,duration,question_prompt,answer,youtube_url
0,0008-0,sj81PWrerDk,Primary Open-ended Question,Plot Attribute (Montage),What is the difference between the action of t...,8.85,Please state your answer with a brief explanat...,,https://www.youtube.com/shorts/sj81PWrerDk
1,0008-1,sj81PWrerDk,Paraphrased Open-ended Question,Plot Attribute (Montage),Can you describe how the actions of the last p...,8.85,Please state your answer with a brief explanat...,,https://www.youtube.com/shorts/sj81PWrerDk
2,0008-2,sj81PWrerDk,Correctly-led Open-ended Question,Plot Attribute (Montage),Did the last person open the bottle without us...,8.85,Please state your answer with a brief explanat...,,https://www.youtube.com/shorts/sj81PWrerDk
3,0008-3,sj81PWrerDk,Wrongly-led Open-ended Question,Plot Attribute (Montage),Did the last person in the video open the bott...,8.85,Please state your answer with a brief explanat...,,https://www.youtube.com/shorts/sj81PWrerDk
4,0008-7,sj81PWrerDk,Multiple-choice Question with a Single Correct...,Plot Attribute (Montage),How does the last person in the video open the...,8.85,E. None of the above\nSelect one best answer t...,,https://www.youtube.com/shorts/sj81PWrerDk


## Download, Extract, and Prepare Videos

Downloads, extracts, and uploads videos (to GCS or File API). Updates `video_metadata.csv`.

In [None]:
dataset_path = Path(DATASET_CSV)
dataset_df = pd.read_csv(dataset_path, dtype=str)

### Download Video Archive

In [None]:
display(Markdown("### Downloading Archive"))
# Remember to set SKIP_DOWNLOAD_ZIP to True if you want to skip the download and you already have the zip file
if zip_file_path.is_file() and SKIP_DOWNLOAD_ZIP:
    display(Markdown(f"✅ Skipping download: Found `{zip_file_path}`."))
else:
    try: download_file_with_progress(VIDEO_ZIP_URL, zip_file_path); display(Markdown(f"✅ Downloaded: `{zip_file_path}`."))
    except Exception as e:
        display(Markdown(f"❌ **Download Error:** {e}."))
        if not SKIP_EXTRACT or not SKIP_PREPARE: raise RuntimeError(f"Download failed.")
        else: display(Markdown("⚠️ Download failed, skipping steps."))

### Downloading Archive

✅ Skipping download: Found `downloads/all_videos.zip`.

### Extract Video Archive

In [None]:
display(Markdown("### Extracting Archive"))
# Check if the zip file exists and if we should skip extraction
if any(extracted_videos_path.glob('*.mp4')) and SKIP_EXTRACT:
    display(Markdown(f"✅ Skipping extraction: Files in `{extracted_videos_path}`."))
elif not zip_file_path.is_file():
    display(Markdown(f"❌ Cannot extract: `{zip_file_path}` missing."))
    if not SKIP_PREPARE: raise RuntimeError(f"Zip missing.")
    else: display(Markdown("⚠️ Extraction skipped (no zip)."))
else:
    try: 
        extract_zip(zip_file_path, extracted_videos_path)
        # Move all videos to main directory
        move_videos_to_main_directory(extracted_videos_path)
        display(Markdown(f"✅ Extracted to `{extracted_videos_path}` and moved all videos to main directory."))
    except Exception as e:
        display(Markdown(f"❌ **Extraction Error:** {e}."))
        if not SKIP_PREPARE: raise RuntimeError("Extraction failed.")
        else: display(Markdown("⚠️ Extraction failed, skipping prep."))

### Extracting Archive

✅ Skipping extraction: Files in `extracted_videos`.

### Slow/Speed Up Videos
Losslessly change video speed while also re-encoding audio to maintain pitch. As
a result, is super fast. Could be made faster if using asyncio to concurrently run
ffmpeg. The video results are saved into **speed_videos/0.5**

In [None]:
async def run_subprocess(cmd, check=True, capture_output=False):
    """Helper function to run subprocess asynchronously."""
    stdout_pipe = asyncio.subprocess.PIPE if capture_output else asyncio.subprocess.DEVNULL
    # Capture stderr only if check is True or capture_output is True, otherwise DEVNULL
    stderr_pipe = asyncio.subprocess.PIPE if check or capture_output else asyncio.subprocess.DEVNULL

    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=stdout_pipe,
        stderr=stderr_pipe
    )
    stdout, stderr = await process.communicate()

    if check and process.returncode != 0:
        error_msg = f"Command '{' '.join(cmd)}' failed with return code {process.returncode}"
        stderr_decoded = stderr.decode(errors='ignore') if stderr else ""
        if stderr_decoded:
            error_msg += f"\nStderr: {stderr_decoded}"
        # Raise specific exception to potentially capture stderr later
        raise subprocess.CalledProcessError(process.returncode, cmd, output=stdout, stderr=stderr)

    return stdout, stderr, process.returncode

async def process_single_video(vid_path, speed_videos_path, VIDEO_SPEED_FACTOR, semaphore):
    """Asynchronously processes a single video. Returns status string."""
    vid_path_str = str(vid_path.resolve())
    out_path = speed_videos_path / vid_path.name
    out_path_str = str(out_path.resolve())

    async with semaphore: # Limit concurrency
        if out_path.is_file():
            return 'skipped'

        if VIDEO_SPEED_FACTOR == 1.0:
            try:
                # Use asyncio.to_thread for potentially blocking I/O
                await asyncio.to_thread(shutil.copy, vid_path_str, out_path_str)
                return 'processed'
            except Exception as e:
                try:
                    logger.error(f"Error copying {vid_path.name}: {e}")
                except NameError:
                    print(f"Error copying {vid_path.name}: {e}")
                return 'error'

        # --- Process video with speed change ---
        tf_bitstream_path = None
        tf_audio_path = None
        tf_final_path = None
        try:
            # Create temporary files (synchronous part is okay here)
            # Context manager ensures files are closed before ffmpeg uses them
            with tempfile.NamedTemporaryFile(delete=False, suffix=".h264") as tf_b, \
                 tempfile.NamedTemporaryFile(delete=False, suffix=".aac") as tf_a, \
                 tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tf_f:
                tf_bitstream_name = tf_b.name
                tf_audio_name = tf_a.name
                tf_final_name = tf_f.name
            # Store paths for cleanup
            tf_bitstream_path = Path(tf_bitstream_name)
            tf_audio_path = Path(tf_audio_name)
            tf_final_path = Path(tf_final_name)


            # Get original FPS
            ffprobe_cmd = [
                "ffprobe", "-v", "error", "-select_streams", "v", "-of", "default=noprint_wrappers=1:nokey=1",
                "-show_entries", "stream=r_frame_rate", vid_path_str
            ]
            stdout, _, _ = await run_subprocess(ffprobe_cmd, check=True, capture_output=True)
            fps = float(fractions.Fraction(stdout.decode().strip()))
            new_fps = fps * VIDEO_SPEED_FACTOR

            # Extract and speed up audio
            factor = VIDEO_SPEED_FACTOR
            filter_parts = []
            while factor > 2.0:
                filter_parts.append("atempo=2.0")
                factor /= 2.0
            while factor < 0.5:
                filter_parts.append("atempo=0.5")
                factor /= 0.5
            if abs(factor - 1.0) > 1e-6:
                 filter_parts.append(f"atempo={factor:.6f}")

            if not filter_parts:
                 audio_cmd = ["ffmpeg", "-y", "-i", vid_path_str, "-vn", "-c:a", "copy", tf_audio_name]
            else:
                audio_filter = ",".join(filter_parts)
                audio_cmd = ["ffmpeg", "-y", "-i", vid_path_str, "-vn", "-filter:a", audio_filter, "-c:a", "aac", "-b:a", "128k", tf_audio_name]
            await run_subprocess(audio_cmd, check=True)


            # Extract h264 bitstream
            extract_cmd = ["ffmpeg", "-y", "-i", vid_path_str, "-map", "0:v", "-c:v", "copy", "-bsf:v", "h264_mp4toannexb", tf_bitstream_name]
            await run_subprocess(extract_cmd, check=True)

            # Remux bitstream with new audio and FPS
            remux_cmd = ["ffmpeg", "-y", "-fflags", "+genpts", "-r", f"{new_fps:.6f}", "-i", tf_bitstream_name, "-i", tf_audio_name, "-map", "0:v", "-map", "1:a", "-c:v", "copy", "-c:a", "copy", tf_final_name]
            await run_subprocess(remux_cmd, check=True)

            # Move final file (use asyncio.to_thread)
            await asyncio.to_thread(shutil.move, tf_final_name, out_path_str)
            return 'processed'

        except Exception as e:
            err_msg = f"Error processing {vid_path.name}: {e}"
            # Include ffmpeg stderr if available
            if isinstance(e, subprocess.CalledProcessError) and e.stderr:
                 err_msg += f"\nFFmpeg/FFprobe Stderr:\n{e.stderr.decode(errors='ignore')}"
            try:
                logger.error(err_msg)
            except NameError:
                print(err_msg)
            return 'error'
        finally:
            # Clean up temporary files asynchronously using to_thread
            async def _cleanup():
                if tf_bitstream_path and tf_bitstream_path.exists():
                    tf_bitstream_path.unlink(missing_ok=True)
                if tf_audio_path and tf_audio_path.exists():
                    tf_audio_path.unlink(missing_ok=True)
                # tf_final is moved, only delete if error occurred before move
                if tf_final_path and tf_final_path.exists():
                    tf_final_path.unlink(missing_ok=True)
            # Run sync cleanup in thread only if paths were assigned
            if tf_bitstream_path or tf_audio_path or tf_final_path:
                 await asyncio.to_thread(_cleanup)


# --- Main Cell Logic ---

async def run_processing(): # Wrap in an async function to use await
    display(Markdown("### Preparing Videos"))
    if dataset_df is None:
        raise RuntimeError("Dataset DF unavailable.")

    # Ensure output directory exists
    speed_videos_path.mkdir(parents=True, exist_ok=True)

    all_video_ids = sorted(list(dataset_df['video_id'].dropna().unique()))
    # Use logging if available, otherwise print
    try:
        logger.info(f"Processing {len(all_video_ids)} unique video IDs.")
    except NameError:
        print(f"Processing {len(all_video_ids)} unique video IDs.")

    vid_paths = list(extracted_videos_path.glob("*.mp4"))

    # Limit concurrency
    concurrency_limit = MAX_ASYNC_WORKERS
    try:
        logger.info(f"Using concurrency limit: {concurrency_limit}")
    except NameError:
        print(f"Using concurrency limit: {concurrency_limit}")
    semaphore = asyncio.Semaphore(concurrency_limit)

    tasks = []
    # Keep the familiar loop structure for creating tasks
    print(f"Preparing tasks for {len(vid_paths)} videos...")
    for vid_path in vid_paths:
         # Create a task for each video processing job
         # Pass necessary arguments to the task creator
         task = asyncio.create_task(process_single_video(vid_path, speed_videos_path, VIDEO_SPEED_FACTOR, semaphore))
         tasks.append(task)

    # Now, run all the created tasks concurrently and display progress
    # Use asyncio.as_completed with a standard tqdm progress bar
    print(f"Transforming {len(tasks)} Videos...")
    results = []
    # Use the imported tqdm (now tqdm.auto) to create a standard progress bar instance
    with tqdm(total=len(tasks), desc="Transforming Videos", unit="video") as pbar:
        for future in asyncio.as_completed(tasks):
            try:
                result = await future # Get result from completed task
                results.append(result)
            except Exception as exc:
                # Log errors from tasks that failed internally if not caught by process_single_video
                # (process_single_video should ideally return 'error' status instead of raising)
                try:
                    logger.error(f"Task for a video failed: {exc}")
                except NameError:
                    print(f"Task for a video failed: {exc}")
                results.append('error') # Count as error if task itself fails unexpectedly
            finally:
                 pbar.update(1) # Increment progress bar regardless of outcome


    # Count results
    processed = results.count('processed')
    skipped = results.count('skipped')
    errors = results.count('error')

    print(f"\n\n{skipped} videos skipped, {processed} videos processed, {errors} errors, {len(vid_paths)} total.")

# --- Execute the async processing ---
# In a Jupyter Notebook, you usually need to await the top-level async function.
# If top-level await isn't enabled, you might need nest_asyncio or run manually.
# Using await directly is the most common way in modern notebooks.

await run_processing()


### Preparing Videos

2025-05-09 16:57:49,041 - INFO - Processing 289 unique video IDs.
2025-05-09 16:57:49,042 - INFO - Using concurrency limit: 25
Preparing tasks for 289 videos...
Transforming 289 Videos...


Transforming Videos:   0%|          | 0/289 [00:00<?, ?video/s]



289 videos skipped, 0 videos processed, 0 errors, 289 total.


### Preparing and Upload Videos to GCS or File API

Upload all videos into File API, and record the link to that video inside video_meta_data

In [None]:
# --- Prepare Videos (Upload GCS/File API) & Update Metadata --- #
display(Markdown("### Preparing Videos & Updating Metadata"))
if SKIP_PREPARE:
    display(Markdown("✅ Skipping video preparation."))
elif storage_client is None:
     display(Markdown("❌ Cannot prepare: Client not ready.")); raise RuntimeError("Client missing.")
else:
    if dataset_df is None: raise RuntimeError("Dataset DF unavailable.")
    all_video_ids = sorted(list(dataset_df['video_id'].dropna().unique()))
    logger.info(f"Processing {len(all_video_ids)} unique video IDs.")

    videos_to_process_ids = all_video_ids
    if MAX_VIDEOS_TO_PROCESS is not None:
        videos_to_process_ids = all_video_ids[:MAX_VIDEOS_TO_PROCESS]
        logger.info(f"Limiting to {len(videos_to_process_ids)} videos.")

    # Load existing metadata to check status
    existing_statuses = {}
    resource_ids = {}
    required_id_col = 'gcs_uri' if USE_VERTEX else 'file_api_name'
    if Path(METADATA_FILE).is_file():
        try:
            existing_df = pd.read_csv(METADATA_FILE, dtype=str)
            if 'video_id' in existing_df.columns and 'status' in existing_df.columns:
                existing_statuses = pd.Series(existing_df.status.values, index=existing_df.video_id).to_dict()
            if 'video_id' in existing_df.columns and required_id_col in existing_df.columns:
                 resource_ids = pd.Series(existing_df[required_id_col].values, index=existing_df.video_id).dropna().to_dict()
            logger.info("Checked existing metadata statuses/IDs.")
        except Exception as e: logger.warning(f"Could not load existing metadata: {e}")

    video_metadata_updates = {}
    processed_count, upload_failures, missing_local, skipped_count = 0, 0, 0, 0
    num_batches = math.ceil(len(videos_to_process_ids) / UPLOAD_BATCH_SIZE_GCS)
    prep_mode = "GCS Upload" if USE_VERTEX else "File API Upload"

    with tqdm(total=len(videos_to_process_ids), desc=f"Preparing ({prep_mode})") as pbar:
        for i in range(0, len(videos_to_process_ids), UPLOAD_BATCH_SIZE_GCS):
            batch_ids = videos_to_process_ids[i : i + UPLOAD_BATCH_SIZE_GCS]
            batch_num = (i // UPLOAD_BATCH_SIZE_GCS) + 1
            logger.info(f"Prep Batch {batch_num}/{num_batches}...")
            current_batch_updates = {}

            for video_id in batch_ids:
                pbar.set_postfix_str(f"ID: {video_id}")
                update_data = {"local_path": None, "gcs_uri": None, "file_api_name": None, "status": "error_unknown"}
                local_video_path = speed_videos_path / f"{video_id}.mp4"
                current_status = existing_statuses.get(video_id, 'pending')
                existing_resource_id = resource_ids.get(video_id)
                is_already_processed = False

                # Check if already uploaded and verified
                if current_status in ['uploaded_gcs', 'uploaded_file_api'] and existing_resource_id:
                     verified = False
                     if USE_VERTEX: verified = verify_gcs_file_exists(storage_client, existing_resource_id)
                     else: verified = verify_file_api_resource_exists(storage_client, existing_resource_id)
                     if verified:
                         logger.debug(f"Skipping verified video {video_id} ('{current_status}').")
                         is_already_processed = True
                         skipped_count += 1
                         update_data.update({ # Ensure metadata is consistent
                             'local_path': str(local_video_path) if local_video_path.is_file() else None,
                             'status': current_status,
                             required_id_col: existing_resource_id
                         })
                     else:
                         logger.warning(f"Video {video_id} ({current_status}) needs re-processing (verification failed).")
                elif current_status != 'pending':
                     logger.debug(f"Video {video_id} has non-pending status '{current_status}' but no verified resource ID. Re-processing.")

                if is_already_processed:
                    processed_count += 1
                    current_batch_updates[video_id] = update_data
                    pbar.update(1)
                    continue

                # Process if needed
                if local_video_path.is_file():
                    update_data["local_path"] = str(local_video_path)
                    resource_id_result = None
                    if USE_VERTEX:
                        blob_name = f"videos/{video_id}.mp4"
                        resource_id_result = upload_to_gcs(storage_client, GCS_BUCKET, local_video_path, blob_name)
                        if resource_id_result: update_data.update({"gcs_uri": resource_id_result, "status": "uploaded_gcs"})
                        else: update_data["status"] = "gcs_upload_failed"; upload_failures += 1
                    else: # Gemini API
                        resource_id_result = upload_via_file_api(storage_client, local_video_path, f"vid_{video_id}")
                        if resource_id_result: update_data.update({"file_api_name": resource_id_result, "status": "uploaded_file_api"})
                        else: update_data["status"] = "file_api_upload_failed"; upload_failures += 1
                else:
                    logger.warning(f"Local file missing: {local_video_path}")
                    missing_local += 1
                    update_data["status"] = "local_missing"

                current_batch_updates[video_id] = update_data
                processed_count += 1
                pbar.update(1)

            # Update metadata after batch
            if current_batch_updates:
                 try: create_or_update_metadata(METADATA_FILE, dataset_df, current_batch_updates)
                 except Exception as e: logger.error(f"Metadata update failed batch {batch_num}: {e}")
                 video_metadata_updates.update(current_batch_updates)

    logger.info(f"Prep finished. Checked: {processed_count}, Skipped(verified): {skipped_count}, Missing Local: {missing_local}, Upload Failures: {upload_failures}")
    display(Markdown(f"✅ Video preparation complete. See logs. Metadata: `{METADATA_FILE}`."))


### Preparing Videos & Updating Metadata

2025-05-09 16:56:30,359 - INFO - Processing 289 unique video IDs.
2025-05-09 16:56:30,368 - INFO - Checked existing metadata statuses/IDs.


Preparing (File API Upload):   0%|          | 0/289 [00:00<?, ?it/s]

2025-05-09 16:56:30,373 - INFO - Prep Batch 1/29...
2025-05-09 16:56:31,635 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/w39suyzut56n "HTTP/1.1 200 OK"
2025-05-09 16:56:31,897 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/41kf9ogzg8j0 "HTTP/1.1 200 OK"
2025-05-09 16:56:32,947 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ve0u1v50aefp "HTTP/1.1 200 OK"
2025-05-09 16:56:33,200 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/0l5gb60u48l2 "HTTP/1.1 200 OK"
2025-05-09 16:56:34,238 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/miph0amkhkls "HTTP/1.1 200 OK"
2025-05-09 16:56:34,502 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/w3ip4tz6eciq "HTTP/1.1 200 OK"
2025-05-09 16:56:35,511 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/tpvog0tailnp "HTTP/1

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:38,022 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/o331ocqdkfkp "HTTP/1.1 200 OK"
2025-05-09 16:56:38,274 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/l4ya82g93xzp "HTTP/1.1 200 OK"
2025-05-09 16:56:38,532 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/95rsvlque3n6 "HTTP/1.1 200 OK"
2025-05-09 16:56:38,788 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/eez9invg1g17 "HTTP/1.1 200 OK"
2025-05-09 16:56:39,045 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/vctx6ncsmds3 "HTTP/1.1 200 OK"
2025-05-09 16:56:39,299 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/uhuu1lext2td "HTTP/1.1 200 OK"
2025-05-09 16:56:39,555 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/tkwluqfg0wku "HTTP/1.1 200 OK"
2025-05-09 16:56:39,807 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:40,630 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/850utvj99uhm "HTTP/1.1 200 OK"
2025-05-09 16:56:40,878 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/0zp1rnmesj76 "HTTP/1.1 200 OK"
2025-05-09 16:56:41,133 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/f7qb6ggy1adz "HTTP/1.1 200 OK"
2025-05-09 16:56:41,388 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/f7b0q7ybt777 "HTTP/1.1 200 OK"
2025-05-09 16:56:41,636 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/h3tp5l56v26c "HTTP/1.1 200 OK"
2025-05-09 16:56:41,884 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ra6pu7q12dgu "HTTP/1.1 200 OK"
2025-05-09 16:56:42,135 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/yxpcpnqtme35 "HTTP/1.1 200 OK"
2025-05-09 16:56:42,387 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:43,187 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/r5zikoquxv18 "HTTP/1.1 200 OK"
2025-05-09 16:56:43,479 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6z3528vj1395 "HTTP/1.1 200 OK"
2025-05-09 16:56:43,715 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/wcbt54nw1f9k "HTTP/1.1 200 OK"
2025-05-09 16:56:43,958 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/7nkufe223z8c "HTTP/1.1 200 OK"
2025-05-09 16:56:44,213 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/xa20qmgsshox "HTTP/1.1 200 OK"
2025-05-09 16:56:44,454 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/en9ic7loesdj "HTTP/1.1 200 OK"
2025-05-09 16:56:44,695 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/yqveoimn6onv "HTTP/1.1 200 OK"
2025-05-09 16:56:44,939 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:45,700 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/k3yvdgy1no7g "HTTP/1.1 200 OK"
2025-05-09 16:56:45,958 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/xvsy1an2lhlv "HTTP/1.1 200 OK"
2025-05-09 16:56:46,259 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/zctd8wq4eju6 "HTTP/1.1 200 OK"
2025-05-09 16:56:46,573 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/wbznq1bzp6na "HTTP/1.1 200 OK"
2025-05-09 16:56:46,819 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/dep2ay5syolh "HTTP/1.1 200 OK"
2025-05-09 16:56:47,105 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/4cgaw9bvnqoh "HTTP/1.1 200 OK"
2025-05-09 16:56:47,358 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/v97uxv5emiqi "HTTP/1.1 200 OK"
2025-05-09 16:56:47,605 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:48,384 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/w8dn1phzsxjc "HTTP/1.1 200 OK"
2025-05-09 16:56:48,622 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/imma063kra4k "HTTP/1.1 200 OK"
2025-05-09 16:56:48,878 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/egc3cmvttnjc "HTTP/1.1 200 OK"
2025-05-09 16:56:49,125 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/grzg97gufm0j "HTTP/1.1 200 OK"
2025-05-09 16:56:49,390 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/1a2zpnt932tr "HTTP/1.1 200 OK"
2025-05-09 16:56:49,657 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/avqnwigwnbmx "HTTP/1.1 200 OK"
2025-05-09 16:56:49,900 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/rttv291eulcp "HTTP/1.1 200 OK"
2025-05-09 16:56:50,146 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:50,917 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/hz5da1rc8wq9 "HTTP/1.1 200 OK"
2025-05-09 16:56:51,168 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/i4u9iuibcuja "HTTP/1.1 200 OK"
2025-05-09 16:56:51,417 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ezep5lpjlhhr "HTTP/1.1 200 OK"
2025-05-09 16:56:51,657 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/piij58us0sjv "HTTP/1.1 200 OK"
2025-05-09 16:56:51,908 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/qjyl5vv8hq7r "HTTP/1.1 200 OK"
2025-05-09 16:56:52,157 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/386r61cvh8ny "HTTP/1.1 200 OK"
2025-05-09 16:56:52,397 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/gn8t0qz2u8n7 "HTTP/1.1 200 OK"
2025-05-09 16:56:52,648 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:53,402 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/9yor6cwv4ezw "HTTP/1.1 200 OK"
2025-05-09 16:56:53,645 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/b6g8xcvjdb27 "HTTP/1.1 200 OK"
2025-05-09 16:56:53,894 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ww1qpxrzy8ql "HTTP/1.1 200 OK"
2025-05-09 16:56:54,138 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/57l2qmgo1h74 "HTTP/1.1 200 OK"
2025-05-09 16:56:54,385 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ys8kx3o8opbu "HTTP/1.1 200 OK"
2025-05-09 16:56:54,631 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/9ao2iwy3us1f "HTTP/1.1 200 OK"
2025-05-09 16:56:54,874 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/3h6s5wxn5f6k "HTTP/1.1 200 OK"
2025-05-09 16:56:55,118 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:55,951 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/qk9xg2155t6z "HTTP/1.1 200 OK"
2025-05-09 16:56:56,198 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/2xy44ytetltq "HTTP/1.1 200 OK"
2025-05-09 16:56:56,439 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/nqgqryebwlcr "HTTP/1.1 200 OK"
2025-05-09 16:56:56,680 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/l64ocrrte2wx "HTTP/1.1 200 OK"
2025-05-09 16:56:56,935 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/zf3wogc0vuqk "HTTP/1.1 200 OK"
2025-05-09 16:56:57,188 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/megi1xkk7ywg "HTTP/1.1 200 OK"
2025-05-09 16:56:57,445 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/1wb6dcakr1xz "HTTP/1.1 200 OK"
2025-05-09 16:56:57,696 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:56:58,470 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/8544ulcewne3 "HTTP/1.1 200 OK"
2025-05-09 16:56:58,767 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/m3jf7mzrcctj "HTTP/1.1 200 OK"
2025-05-09 16:56:59,012 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/7p7jia4uic8i "HTTP/1.1 200 OK"
2025-05-09 16:56:59,255 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/kynzzmyjlb7y "HTTP/1.1 200 OK"
2025-05-09 16:56:59,508 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/fqchbh2e671j "HTTP/1.1 200 OK"
2025-05-09 16:56:59,754 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/zk2v39e967vq "HTTP/1.1 200 OK"
2025-05-09 16:57:00,000 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/g9cboa9af30f "HTTP/1.1 200 OK"
2025-05-09 16:57:00,247 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:01,039 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/v4s94ljlxdk3 "HTTP/1.1 200 OK"
2025-05-09 16:57:01,299 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/gj7uemw49sa7 "HTTP/1.1 200 OK"
2025-05-09 16:57:01,549 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/vfw9z5w5c93y "HTTP/1.1 200 OK"
2025-05-09 16:57:01,829 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/kr3x7ic2fev4 "HTTP/1.1 200 OK"
2025-05-09 16:57:02,072 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/euh0i9eu2hco "HTTP/1.1 200 OK"
2025-05-09 16:57:02,333 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/q1612r9x0obr "HTTP/1.1 200 OK"
2025-05-09 16:57:02,588 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/c40r3ucyxmdr "HTTP/1.1 200 OK"
2025-05-09 16:57:02,826 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:03,602 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/7cgupmr0qckp "HTTP/1.1 200 OK"
2025-05-09 16:57:03,836 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/7a4rmks2sn07 "HTTP/1.1 200 OK"
2025-05-09 16:57:04,089 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/5teru7glwp15 "HTTP/1.1 200 OK"
2025-05-09 16:57:04,335 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/17aol3hrpsp0 "HTTP/1.1 200 OK"
2025-05-09 16:57:04,591 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/bo4yhl3y6ikb "HTTP/1.1 200 OK"
2025-05-09 16:57:04,880 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/uh012ar70nx0 "HTTP/1.1 200 OK"
2025-05-09 16:57:05,133 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/j3i58jftpzge "HTTP/1.1 200 OK"
2025-05-09 16:57:05,381 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:06,175 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ysegdo3gejz9 "HTTP/1.1 200 OK"
2025-05-09 16:57:06,423 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/4ausvv5th7pz "HTTP/1.1 200 OK"
2025-05-09 16:57:06,677 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/5jty59n0m2rq "HTTP/1.1 200 OK"
2025-05-09 16:57:06,926 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/h0rbedwgiyf4 "HTTP/1.1 200 OK"
2025-05-09 16:57:07,186 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/oywph0404air "HTTP/1.1 200 OK"
2025-05-09 16:57:07,446 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/2bpybh3ldtg3 "HTTP/1.1 200 OK"
2025-05-09 16:57:07,692 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ilo72cp5qg05 "HTTP/1.1 200 OK"
2025-05-09 16:57:07,972 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:08,727 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/bwz2bf0h6f8d "HTTP/1.1 200 OK"
2025-05-09 16:57:08,977 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/cu82zg5agwwx "HTTP/1.1 200 OK"
2025-05-09 16:57:09,227 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/g5w50kv57m10 "HTTP/1.1 200 OK"
2025-05-09 16:57:09,466 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/vquplbx2vkr6 "HTTP/1.1 200 OK"
2025-05-09 16:57:09,718 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/cgqwnzgjxie5 "HTTP/1.1 200 OK"
2025-05-09 16:57:09,959 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/wlon0ix4nh74 "HTTP/1.1 200 OK"
2025-05-09 16:57:10,216 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/3icelho8pss0 "HTTP/1.1 200 OK"
2025-05-09 16:57:10,467 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:11,269 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/lq01sle000zk "HTTP/1.1 200 OK"
2025-05-09 16:57:11,520 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/1lojcwe0fc9x "HTTP/1.1 200 OK"
2025-05-09 16:57:11,775 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/s8io01dkfs9a "HTTP/1.1 200 OK"
2025-05-09 16:57:12,020 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6qhargnlkbsc "HTTP/1.1 200 OK"
2025-05-09 16:57:12,269 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/i0f596v6i26q "HTTP/1.1 200 OK"
2025-05-09 16:57:12,505 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/2hi8wri3w5x1 "HTTP/1.1 200 OK"
2025-05-09 16:57:12,760 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/a8c3pe0jcr9t "HTTP/1.1 200 OK"
2025-05-09 16:57:13,001 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:13,784 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/15b911pr2jqi "HTTP/1.1 200 OK"
2025-05-09 16:57:14,075 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/g0ck73kjuagx "HTTP/1.1 200 OK"
2025-05-09 16:57:14,330 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/8rq9humq5yu7 "HTTP/1.1 200 OK"
2025-05-09 16:57:14,577 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/g1nsxqxxuogw "HTTP/1.1 200 OK"
2025-05-09 16:57:14,834 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/2a793o0hi94y "HTTP/1.1 200 OK"
2025-05-09 16:57:15,081 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/85e9rogej6kp "HTTP/1.1 200 OK"
2025-05-09 16:57:15,323 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/d23uc9dsybox "HTTP/1.1 200 OK"
2025-05-09 16:57:15,557 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:16,323 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/82lk06s1tgar "HTTP/1.1 200 OK"
2025-05-09 16:57:16,571 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/buix7z0gz6wt "HTTP/1.1 200 OK"
2025-05-09 16:57:16,883 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/o6ind10d5u84 "HTTP/1.1 200 OK"
2025-05-09 16:57:17,174 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/zdevhd2kdobm "HTTP/1.1 200 OK"
2025-05-09 16:57:17,403 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/egtu044i2pgq "HTTP/1.1 200 OK"
2025-05-09 16:57:17,642 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/x90zg16j04uk "HTTP/1.1 200 OK"
2025-05-09 16:57:17,907 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/hyme0p59iq82 "HTTP/1.1 200 OK"
2025-05-09 16:57:18,157 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:18,939 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/xpznelj2xm4q "HTTP/1.1 200 OK"
2025-05-09 16:57:19,183 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/s3spo0my9b00 "HTTP/1.1 200 OK"
2025-05-09 16:57:19,448 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ftfoiwkd27yp "HTTP/1.1 200 OK"
2025-05-09 16:57:19,695 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/5y21918ikeqt "HTTP/1.1 200 OK"
2025-05-09 16:57:19,944 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ducboy43ruad "HTTP/1.1 200 OK"
2025-05-09 16:57:20,222 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/7zof77ujsuy3 "HTTP/1.1 200 OK"
2025-05-09 16:57:20,465 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/aj479161t11i "HTTP/1.1 200 OK"
2025-05-09 16:57:20,706 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:21,449 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/4yih81l9tvhv "HTTP/1.1 200 OK"
2025-05-09 16:57:21,695 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/edkb2geri5bi "HTTP/1.1 200 OK"
2025-05-09 16:57:21,961 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/dmd3mnaj7e5b "HTTP/1.1 200 OK"
2025-05-09 16:57:22,201 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/flh0sis7ucr1 "HTTP/1.1 200 OK"
2025-05-09 16:57:22,445 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6sxh49xz4ttm "HTTP/1.1 200 OK"
2025-05-09 16:57:22,686 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/bp9y9tst4074 "HTTP/1.1 200 OK"
2025-05-09 16:57:22,982 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/e5tz6c1ittpq "HTTP/1.1 200 OK"
2025-05-09 16:57:23,234 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:24,003 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/0dfnxj2nvims "HTTP/1.1 200 OK"
2025-05-09 16:57:24,269 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/0vkijfkw03sz "HTTP/1.1 200 OK"
2025-05-09 16:57:24,509 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/nuihn5uprctp "HTTP/1.1 200 OK"
2025-05-09 16:57:24,764 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ivp540zfo2fp "HTTP/1.1 200 OK"
2025-05-09 16:57:25,018 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/8kxm3b5fti2d "HTTP/1.1 200 OK"
2025-05-09 16:57:25,260 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/n8mvz6g3h7y0 "HTTP/1.1 200 OK"
2025-05-09 16:57:25,512 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/xigtoa2hpgv4 "HTTP/1.1 200 OK"
2025-05-09 16:57:25,759 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:26,640 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/s62z792fjsdx "HTTP/1.1 200 OK"
2025-05-09 16:57:26,891 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/399a5m74lnoy "HTTP/1.1 200 OK"
2025-05-09 16:57:27,149 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/wi2g45y0d40k "HTTP/1.1 200 OK"
2025-05-09 16:57:27,402 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/rpegl8ercnba "HTTP/1.1 200 OK"
2025-05-09 16:57:27,667 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/304m7x4dlr8d "HTTP/1.1 200 OK"
2025-05-09 16:57:27,917 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/wxddeojrd6h3 "HTTP/1.1 200 OK"
2025-05-09 16:57:28,171 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/5ennj6om85m8 "HTTP/1.1 200 OK"
2025-05-09 16:57:28,411 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:29,199 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/8wmdemzu3l26 "HTTP/1.1 200 OK"
2025-05-09 16:57:29,485 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6qqfmnmzm09n "HTTP/1.1 200 OK"
2025-05-09 16:57:29,722 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/fhp8zxr6pf4y "HTTP/1.1 200 OK"
2025-05-09 16:57:29,966 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ajy8v7h34cu4 "HTTP/1.1 200 OK"
2025-05-09 16:57:30,223 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/4w201q6u4gma "HTTP/1.1 200 OK"
2025-05-09 16:57:30,469 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/g610d1rerqd3 "HTTP/1.1 200 OK"
2025-05-09 16:57:30,714 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/cxuh321hxjz6 "HTTP/1.1 200 OK"
2025-05-09 16:57:30,961 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:31,738 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/nbc9aiksod2i "HTTP/1.1 200 OK"
2025-05-09 16:57:31,981 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/x2zy599obeea "HTTP/1.1 200 OK"
2025-05-09 16:57:32,265 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/427h676cwvcd "HTTP/1.1 200 OK"
2025-05-09 16:57:32,510 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/cphcqjexflqu "HTTP/1.1 200 OK"
2025-05-09 16:57:32,754 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/91w9cb8z2qde "HTTP/1.1 200 OK"
2025-05-09 16:57:32,996 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/8p7j77bnkdv3 "HTTP/1.1 200 OK"
2025-05-09 16:57:33,241 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/i623nuayrg3y "HTTP/1.1 200 OK"
2025-05-09 16:57:33,487 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:34,257 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/fu2xlhq5tpqa "HTTP/1.1 200 OK"
2025-05-09 16:57:34,501 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/sj7ykzpstrrq "HTTP/1.1 200 OK"
2025-05-09 16:57:34,759 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/b9dkoyljk04n "HTTP/1.1 200 OK"
2025-05-09 16:57:34,998 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6uqcu4m8y536 "HTTP/1.1 200 OK"
2025-05-09 16:57:35,240 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ff1p4g96dnvp "HTTP/1.1 200 OK"
2025-05-09 16:57:35,509 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/gt9uuplm440n "HTTP/1.1 200 OK"
2025-05-09 16:57:35,753 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/x7kdiwu2lysn "HTTP/1.1 200 OK"
2025-05-09 16:57:35,998 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:36,782 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/gyyzvwexuawu "HTTP/1.1 200 OK"
2025-05-09 16:57:37,017 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/2dqryfjvcdcn "HTTP/1.1 200 OK"
2025-05-09 16:57:37,258 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/q2rlo21hk51u "HTTP/1.1 200 OK"
2025-05-09 16:57:37,488 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/r0jxwzxoju9i "HTTP/1.1 200 OK"
2025-05-09 16:57:37,747 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/9q75b11nkumv "HTTP/1.1 200 OK"
2025-05-09 16:57:38,025 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/z8wka6izb87o "HTTP/1.1 200 OK"
2025-05-09 16:57:38,269 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/kylpi0w0eohc "HTTP/1.1 200 OK"
2025-05-09 16:57:38,516 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:39,284 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/mivgufg9sf2v "HTTP/1.1 200 OK"
2025-05-09 16:57:39,536 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/qdo9uz9hh5fm "HTTP/1.1 200 OK"
2025-05-09 16:57:39,780 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/4h3uut56oxrn "HTTP/1.1 200 OK"
2025-05-09 16:57:40,013 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/4megq9lqu3zt "HTTP/1.1 200 OK"
2025-05-09 16:57:40,264 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/hdolj8budn5y "HTTP/1.1 200 OK"
2025-05-09 16:57:40,507 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/gffl9rgiudup "HTTP/1.1 200 OK"
2025-05-09 16:57:40,755 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/yhvmvysyd4on "HTTP/1.1 200 OK"
2025-05-09 16:57:41,008 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:41,773 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6ukp3ka0v2i7 "HTTP/1.1 200 OK"
2025-05-09 16:57:42,027 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/krcuwkl1r7d0 "HTTP/1.1 200 OK"
2025-05-09 16:57:42,269 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/t1qxyu7n4v4t "HTTP/1.1 200 OK"
2025-05-09 16:57:42,518 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/x4b8ni01b7hk "HTTP/1.1 200 OK"
2025-05-09 16:57:42,767 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/1g4yv3b0fzpr "HTTP/1.1 200 OK"
2025-05-09 16:57:43,012 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/dbgduyhs8xes "HTTP/1.1 200 OK"
2025-05-09 16:57:43,262 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/qtwtb5f4a865 "HTTP/1.1 200 OK"
2025-05-09 16:57:43,505 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:44,318 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/wzhqg1npvno9 "HTTP/1.1 200 OK"
2025-05-09 16:57:44,556 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/1ms27o0mgnd8 "HTTP/1.1 200 OK"
2025-05-09 16:57:44,788 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/3wg34rb3bmeu "HTTP/1.1 200 OK"
2025-05-09 16:57:45,023 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/sn9cog94x05k "HTTP/1.1 200 OK"
2025-05-09 16:57:45,268 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/memmvh0po3rn "HTTP/1.1 200 OK"
2025-05-09 16:57:45,512 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/5v2pfqljdrbk "HTTP/1.1 200 OK"
2025-05-09 16:57:45,764 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/2v21gokzi5xs "HTTP/1.1 200 OK"
2025-05-09 16:57:46,027 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


2025-05-09 16:57:46,860 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/pv2tcwf0urv7 "HTTP/1.1 200 OK"
2025-05-09 16:57:47,101 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/stzfw4bzq0o6 "HTTP/1.1 200 OK"
2025-05-09 16:57:47,361 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/174bq6d7nkja "HTTP/1.1 200 OK"
2025-05-09 16:57:47,637 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ai7s0wpxv68v "HTTP/1.1 200 OK"
2025-05-09 16:57:47,874 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/6l62emtc0x0a "HTTP/1.1 200 OK"
2025-05-09 16:57:48,123 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/hdoaritov1wq "HTTP/1.1 200 OK"
2025-05-09 16:57:48,371 - INFO - HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/files/ibpib2zm6b3e "HTTP/1.1 200 OK"
2025-05-09 16:57:48,609 - INFO - HTTP Req

  meta_df[col] = merged_df[update_col_name].fillna(merged_df[col])


✅ Video preparation complete. See logs. Metadata: `video_metadata_non_vertex.csv`.

# Perform Bulk Inference To Generate Questions

In [46]:
QUESTIONS_DIR = os.path.join(f"generated_questions/{QUESTIONS_MODEL_NAME}", "questions.csv")
os.makedirs(os.path.dirname(QUESTIONS_DIR), exist_ok=True)


In [47]:
# --- Inference Functions ---

async def perform_inference_single_async(
    video_info: Dict,
    client: Any,
    semaphore: asyncio.Semaphore,
    rate_limiter: Optional[AsyncRateLimiter],
    results_queue: asyncio.Queue
) -> None: # Return None as result is put in queue
    """
    Async inference for one question, putting the result into a queue.
    """
    video_id = video_info.get("video_id", "?")
    # prompt_text = build_prompt(question_info)
    prompt_text = QUESTIONS_PROMPT
    gcs_uri = video_info.get("gcs_uri")
    file_api_name = video_info.get("file_api_name")
    start_time = time.time()
    result = None # Default result in case of early exit

    # --- Prepare Inputs (Same as before) ---
    try:
        video_part = None
        if USE_VERTEX:
            # ... (GCS URI logic) ...
            if not gcs_uri: raise ValueError("Missing GCS URI.")
            video_part = types.Part.from_uri(mime_type='video/mp4', file_uri=gcs_uri)
        else: # Gemini API Mode
            # ... (File API get logic) ...
             if not file_api_name: raise ValueError("Missing File API name.")
             try:
                 file_object = await client.aio.files.get(name=file_api_name)
                 video_part = file_object
             except genai_errors.NotFoundError: raise FileNotFoundError(f"File API '{file_api_name}' not found.")
             except Exception as e: raise RuntimeError(f"Failed get File API obj: {e}")

        video_content = types.Content(role="user", parts=[types.Part.from_text(text=prompt_text)])
        
        if video_part is None: raise RuntimeError("Video part preparation failed.")
        contents = [video_content, video_part]

    except (ValueError, FileNotFoundError, RuntimeError) as e:
        logger.error(f"QID {video_id} (Async): Input Error - {e}")
        # Put error result in queue
        result = {"video_id": video_id, "questions": f"ERROR: Input Fail - {e}", "duration": 0, "finish_reason": "N/A", "status": "Failed (Input)"}
        await results_queue.put(result)
        return # Exit the function
    except Exception as e:
         logger.error(f"Video_ID {video_id} (Async): Unexpected Input Prep Error: {e}", exc_info=True)
         result = {"video_id": video_id, "questions": f"ERROR: Input Prep Failed Unexpectedly - {e}", "duration": 0, "finish_reason": "N/A", "status": "Failed (Input Prep)"}
         await results_queue.put(result)
         return # Exit the function

    # --- Perform Inference with Retries, Semaphore, and Rate Limiting ---
    async with semaphore:
        for attempt in range(QUESTIONS_MAX_RETRIES + 1):
            try:
                if rate_limiter: await rate_limiter.acquire()
                api_start = time.time()
                logger.debug(f"QID {video_id} (Async): Attempt {attempt + 1} sending request...")
                response = await client.aio.models.generate_content(
                    model=QUESTIONS_MODEL_NAME,
                    contents=contents,
                    config=QUESTIONS_CONFIG
                )
                print(f"video_id: {video_id}, response: {response}")

                # Process Response
                answer, reason, status, err_detail = "ERROR", "UNKNOWN", "Success", ""
                try:
                    answer = response.parsed.questions
                    if response.candidates and hasattr(response.candidates[0], 'finish_reason') and response.candidates[0].finish_reason is not None:
                        reason = response.candidates[0].finish_reason.name
                except ValueError as ve:
                    status, err_detail = "Blocked/Empty", f"ValueError: {ve}. "
                    # ... (block/safety reason extraction) ...
                    answer = f"ERROR: {status}. {err_detail}"

                result = {"video_id": video_id, "questions": answer, "duration": time.time()-start_time, "finish_reason": reason, "status": status}
                await results_queue.put(result)
                logger.debug(f"QID {video_id} (Async): Attempt {attempt + 1} {status} ({time.time()-api_start:.2f}s API / {time.time()-start_time:.2f}s Total). Result queued.")
                return # Exit function after success


            except Exception as e:
                 result = {"video_id": video_id, "questions": f"ERROR: - {e}", "duration": time.time()-start_time, "finish_reason": reason, "status": "Failed (Unexpected Error)"}
                 await results_queue.put(result)
                 return

        # Fallback (Should not be reached if logic above is correct)
        logger.error(f"VideoID {video_id} (Async): Exited retry loop unexpectedly.")
        result = {"video_id": video_id, "questions": "ERROR: Unknown after retries", "duration": time.time()-start_time, "finish_reason": "UNKNOWN", "status": "Failed (Unknown)"}
        await results_queue.put(result)

async def results_writer_task(
    queue: asyncio.Queue,
    filename: str,
    write_batch_size: int = 20, # How many results to buffer before writing
    write_interval_sec: float = 10.0 # Max time between writes
):
    """Gets results from queue and writes them to CSV in batches."""
    results_buffer = []
    last_write_time = time.monotonic()
    # Define expected header based on the dict keys put in the queue
    fieldnames = ["video_id", "questions", "status", "duration_sec", "finish_reason"]
    file_exists = Path(filename).is_file()

    logger.info(f"Writer task started. Writing results to {filename}")

    while True:
        try:
            # Wait for an item with a timeout
            result = await asyncio.wait_for(queue.get(), timeout=write_interval_sec)

            if result is None: # Signal to terminate
                logger.info("Writer task received termination signal.")
                break

            if isinstance(result, dict):
                 # Ensure duration is rounded here before adding to buffer
                 result["duration_sec"] = round(result.get("duration", -1), 2)
                 # Remove the original 'duration' key if desired
                 result.pop('duration', None)
                 results_buffer.append(result)
            else:
                logger.warning(f"Writer task received non-dict item: {result}")

            queue.task_done() # Signal that the item was processed

        except asyncio.TimeoutError:
            # Timeout occurred, write buffer if not empty, even if batch size not reached
            logger.debug("Writer task timeout reached.")
            pass # Continue to buffer check below

        except Exception as e:
            logger.error(f"Writer task encountered error getting from queue: {e}", exc_info=True)
            # Decide if this is fatal or if we should try to continue
            await asyncio.sleep(1) # Prevent fast spinning on error
            continue # Try to continue processing

        # Check if we should write the buffer
        buffer_size = len(results_buffer)
        time_since_last_write = time.monotonic() - last_write_time
        should_write = (
             buffer_size > 0 and
             (buffer_size >= write_batch_size or time_since_last_write >= write_interval_sec)
        )

        if should_write:
            logger.info(f"Writing batch of {buffer_size} results to {filename}...")
            try:
                # Use 'with open' for proper handling
                with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                    # Write header only if file didn't exist at the start
                    if not file_exists:
                        writer.writeheader()
                        file_exists = True # Prevent writing header again
                    writer.writerows(results_buffer)

                results_buffer = [] # Clear buffer after successful write
                last_write_time = time.monotonic()
                logger.info(f"Batch written successfully.")
            except IOError as e:
                logger.error(f"IOError writing results batch to {filename}: {e}. Results may be lost.")
                # Optional: Add retry logic here, or store failed batch elsewhere
            except Exception as e:
                logger.error(f"Unexpected error writing results batch: {e}", exc_info=True)


    # --- Cleanup: Write any remaining items after receiving None signal ---
    if results_buffer:
        logger.info(f"Writing final remaining {len(results_buffer)} results...")
        try:
            with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                if not file_exists: writer.writeheader() # Check again in case file was deleted mid-run
                writer.writerows(results_buffer)
            logger.info("Final results written.")
        except Exception as e:
            logger.error(f"Error writing final results batch: {e}")

    logger.info("Writer task finished.")

# --- Modified Main Execution Function with UI ---
async def run_bulk_inference_async():
    """Runs the bulk inference process asynchronously with queue writer, rate limiting, and UI."""

    # --- Standard Setup ---
    if ai_client is None: logger.error("AI Client not initialized."); display(Markdown("❌ AI Client not initialized.")); return

    # --- Load Data and Prepare Tasks ---
    logger.info("Loading metadata for inference...")
    
    # Load video questions for inference as a dictionaies of dicts
    # Each dict contains video_id, gcs_uri, file_api_name, and other metadata
    video_questions_for_inference = load_metadata_questions_generation(METADATA_FILE)
    print(f"Loaded {len(video_questions_for_inference)} video questions for inference.")
    
    if not video_questions_for_inference: logger.warning("No video data ready."); return
    processed_qids = load_processed_video_ids(QUESTIONS_DIR)

    inference_tasks_input = []
    tasks_skipped = 0
    required_col = 'gcs_uri' if USE_VERTEX else 'file_api_name'
    for video_id, video_info in video_questions_for_inference.items():
        # Check if video_id is correct:
        if video_id == video_info.get('video_id'):
            logger.debug(f"Video ID {video_id} matches.")
        else:
            logger.warning(f"Video ID mismatch: {video_id} != {video_info.get('video_id')}.")
            continue
        if not video_id or video_id in processed_qids:
            tasks_skipped += 1; continue
        inference_tasks_input.append(video_info)

    total_tasks = len(inference_tasks_input)
    logger.info(f"Prepared {total_tasks} new inference tasks. Skipped {tasks_skipped}." )
    if total_tasks == 0: logger.info("No new questions to process."); return

    # --- Setup Rate Limiter ---
    rate_limiter = None
    if QUESTIONS_REQUESTS_PER_MINUTE is not None and QUESTIONS_REQUESTS_PER_MINUTE > 0:
        capacity = 10 # Example: Keep burst capacity moderate
        rate_limiter = AsyncRateLimiter(rate=QUESTIONS_REQUESTS_PER_MINUTE, period=60.0, capacity=capacity)
        logger.info(f"Rate limiting enabled: {QUESTIONS_REQUESTS_PER_MINUTE} RPM, Capacity: {capacity}")
    else:
        logger.info("Rate limiting disabled.")

    # --- Create Queue and Start Writer Task ---
    results_queue = asyncio.Queue()
    writer_handle = asyncio.create_task(
        results_writer_task(results_queue, QUESTIONS_DIR)
    )

    # --- Schedule and Run Inference Tasks with Progress Bar ---
    logger.info(f"Starting async inference for {total_tasks} questions (Concurrency: {QUESTIONS_MAX_ASYNC_WORKERS})...")
    semaphore = asyncio.Semaphore(QUESTIONS_MAX_ASYNC_WORKERS)
    start_bulk_time = time.time()

    # Create coroutines
    inference_coroutines = [
        perform_inference_single_async(q_info, ai_client, semaphore, rate_limiter, results_queue)
        for q_info in inference_tasks_input
    ]

    # --- Use asyncio.as_completed with tqdm_notebook for live progress ---
    completed_count = 0
    gather_exception = None
    try:
        # Create a future for each coroutine to track completion
        tasks = [asyncio.ensure_future(coro) for coro in inference_coroutines]
        # Use tqdm with as_completed
        for future in tqdm(asyncio.as_completed(tasks), total=total_tasks, desc="Async Inference"):
            try:
                await future # Wait for the next task to complete, raise exception if task failed
                completed_count += 1
            except Exception as task_exc:
                 # Log error from individual task if it wasn't caught inside
                 logger.error(f"Error surfaced from an inference task: {task_exc}", exc_info=False)
                 # Optionally decide if you want to stop processing other tasks
                 # gather_exception = task_exc # Store first exception
                 # break # Or continue processing others
    except Exception as outer_exc:
        # Catch errors during the setup/iteration of as_completed itself
        logger.error(f"Error during as_completed processing: {outer_exc}", exc_info=True)
        gather_exception = outer_exc

    # Ensure all tasks are awaited even if we broke early due to an error in one task
    # (This might be redundant if as_completed handles cancellation correctly, but safer)
    await asyncio.gather(*tasks, return_exceptions=True)

    bulk_duration = time.time() - start_bulk_time
    logger.info(f"Async inference task processing finished in {bulk_duration:.2f} seconds. Completed: {completed_count}/{total_tasks}.")
    if gather_exception:
         logger.error(f"Bulk inference encountered errors: {gather_exception}")

    # --- Signal Writer Task to Finish (ALWAYS DO THIS) ---
    logger.info("Signaling writer task to complete...")
    await results_queue.put(None)

    # --- Wait for Writer Task to Finish (ALWAYS DO THIS) ---
    logger.info("Waiting for writer task to finish writing remaining results...")
    try:
        await writer_handle # Wait until the writer processes the None signal and exits
        logger.info("Writer task has finished.")
    except Exception as writer_exc:
        logger.error(f"Error waiting for writer task: {writer_exc}", exc_info=True)

    # --- Final Summary ---
    logger.info(f"Bulk inference process complete. See logs and {QUESTIONS_DIR}.")
    # Remove handler to prevent duplicate logs on re-run

# --- Run the async function ---
asyncio.run(run_bulk_inference_async())

2025-05-09 21:29:11,214 - INFO - Loading metadata for inference...
2025-05-09 21:29:11,238 - INFO - Loaded 289 unique videos for inference.
Loaded 289 video questions for inference.
2025-05-09 21:29:11,241 - INFO - Loaded 289 processed video IDs from generated_questions/gemini-2.0-flash/questions.csv
2025-05-09 21:29:11,241 - INFO - Prepared 0 new inference tasks. Skipped 289.
2025-05-09 21:29:11,242 - INFO - No new questions to process.


### Interim Clean Up for generated questions

In [48]:
def interim_cleanup_and_report(FILE_DIR):
    """Final cleanup and report of results."""
    if not Path(FILE_DIR).is_file(): display(Markdown(f"❌ Results file `{FILE_DIR}` not found.")); return

    try:
        results_df = pd.read_csv(FILE_DIR, dtype=str)
        logger.info(f"Loaded results from {FILE_DIR} ({len(results_df)} rows).")
        
        # Correctly identify failed entries
        failed = results_df[results_df['questions'].isna() | (results_df['status'] != 'Success')]
        
        # Success is everything not in failed
        success = results_df[~results_df.index.isin(failed.index)]

        display(Markdown(f"### Results Summary: {len(success)} Success, {len(failed)} Failed."))

        # Sort by 'video_id'
        results_df.sort_values(by=['video_id'], inplace=True)

        # remove duplicates based on 'video_id' and keep the first occurrence
        results_df.drop_duplicates(subset=['video_id'], keep='first', inplace=True)
        logger.info(f"Removed duplicates, remaining {len(results_df)} unique QIDs.")

        # Save the cleaned-up results to a new file with datetime suffix
        original_filename = FILE_DIR.replace('.csv', "")
        success_filename = f"{original_filename}.csv"
        failed_filename = f"{original_filename}_failed.csv"
        failed.to_csv(failed_filename, index=False, encoding='utf-8')
        success.drop_duplicates(subset=['video_id'], keep='first', inplace=True)
        success.to_csv(success_filename, index=False, encoding='utf-8')
     
    except Exception as e:
        logger.error(f"Error processing results file: {e}", exc_info=True)
        display(Markdown(f"❌ Error processing results file: {e}."))
        return
  
interim_cleanup_and_report(QUESTIONS_DIR)

2025-05-09 21:29:16,688 - INFO - Loaded results from generated_questions/gemini-2.0-flash/questions.csv (289 rows).


### Results Summary: 289 Success, 0 Failed.

2025-05-09 21:29:16,690 - INFO - Removed duplicates, remaining 289 unique QIDs.


# Perfrom chat conversation history on all Videos

In [49]:
ANSWERS_DIR = f"generated_questions/{QUESTIONS_MODEL_NAME}/chat_history"
os.makedirs(os.path.dirname(ANSWERS_DIR), exist_ok=True)

In [50]:
ANSWERS_CHECK_DIR = os.path.join(f"generated_questions/{QUESTIONS_MODEL_NAME}", "answers.csv")

In [51]:
# ---Fetch Generated Questions for each videos ---
import ast

def get_questions_for_video(questions_file: str = QUESTIONS_DIR) -> Dict[str, List[str]]:
    if not Path(questions_file).is_file(): return {}
    video_questions = defaultdict(list)
    try:
        df = pd.read_csv(questions_file, dtype=str).fillna('')
        if 'video_id' not in df.columns or 'questions' not in df.columns:
            logger.error(f"Questions file missing 'video_id' or 'question'.")
            return {}
        valid_df = df[df['video_id'].astype(bool) & df['questions'].astype(bool)]
        if len(valid_df) == 0:
             logger.warning(f"No videos found with questions in {questions_file}. Check Step 4.")
             return {}
        for video_id in valid_df['video_id'].unique():
            question_str = valid_df[valid_df['video_id'] == video_id]['questions'].iloc[0]
            questions_list = ast.literal_eval(question_str)
            video_questions[video_id] = questions_list
        logger.info(f"Loaded {len(video_questions)} videos ({len(valid_df)} questions) with valid IDs for inference.")
        return dict(video_questions)
    except Exception as e:
        logger.error(f"Error loading questions: {e}", exc_info=True)
        return {}

generated_questions_dict = get_questions_for_video(QUESTIONS_DIR)

2025-05-09 21:29:21,729 - INFO - Loaded 289 videos (289 questions) with valid IDs for inference.


In [52]:
def serialize_chat(chat):
    return [
        {
            "role": msg.role,
            "parts": [part.text for part in msg.parts]
        } for msg in chat
    ]

In [None]:
# --- Inference Functions ---

async def perform_inference_single_async(
    video_info: Dict,
    client: Any,
    semaphore: asyncio.Semaphore,
    rate_limiter: Optional[AsyncRateLimiter],
    results_queue: asyncio.Queue
) -> None: # Return None as result is put in queue
    """
    Async inference for one question, putting the result into a queue.
    """
    video_id = video_info.get("video_id", "?")
    # prompt_text = build_prompt(question_info)
    gcs_uri = video_info.get("gcs_uri")
    file_api_name = video_info.get("file_api_name")
    start_time = time.time()
    result = None # Default result in case of early exit

    
    questions_list = generated_questions_dict.get(video_id, [])[:3]
    
    if f"{video_id}.json" in os.listdir(ANSWERS_DIR):
        logger.info(f"QID {video_id} (Async): Already processed. Skipping.")
        result = {"video_id": video_id, "questions": questions_list, "duration": 0, "finish_reason": "N/A", "status": "Already Processed"}
        await results_queue.put(result)
        return
    # --- Prepare Inputs (Same as before) ---
    try:
        video_part = None
        if USE_VERTEX:
            # ... (GCS URI logic) ...
            if not gcs_uri: raise ValueError("Missing GCS URI.")
            video_part = types.Part.from_uri(mime_type='video/mp4', file_uri=gcs_uri)
        else: # Gemini API Mode
            # ... (File API get logic) ...
             if not file_api_name: raise ValueError("Missing File API name.")
             try:
                 file_object = await client.aio.files.get(name=file_api_name)
                 video_part = file_object
             except genai_errors.NotFoundError: raise FileNotFoundError(f"File API '{file_api_name}' not found.")
             except Exception as e: raise RuntimeError(f"Failed get File API obj: {e}")
        
        if video_part is None: raise RuntimeError("Video part preparation failed.")
        
    except (ValueError, FileNotFoundError, RuntimeError) as e:
        logger.error(f"QID {video_id} (Async): Input Error - {e}")
        # Put error result in queue
        result = {"video_id": video_id, "questions": questions_list, "duration": 0, "finish_reason": "N/A", "status": "Failed (Input)"}
        await results_queue.put(result)
        return # Exit the function
    except Exception as e:
         logger.error(f"Video_ID {video_id} (Async): Unexpected Input Prep Error: {e}", exc_info=True)
         result = {"video_id": video_id, "questions": questions_list, "duration": 0, "finish_reason": "N/A", "status": "Failed (Input Prep)"}
         await results_queue.put(result)
         return # Exit the function

    # --- Perform Inference with Retries, Semaphore, and Rate Limiting ---
    async with semaphore:
        for attempt in range(QUESTIONS_MAX_RETRIES + 1):
            try:
                if rate_limiter: await rate_limiter.acquire()
                api_start = time.time()
                logger.debug(f"QID {video_id} (Async): Attempt {attempt + 1} sending request...")
            
                try:
                    chat: list = []
                    
                    for idx, q in enumerate(questions_list, 1):
                        user_msg = types.Content(role="user",
                                                parts=[types.Part.from_text(text=q)])
                        # always include video_part
                        contents = [video_part] + chat + [user_msg]

                        try:
                            rsp = client.models.generate_content(model=MODEL_NAME,
                                                                contents=contents,
                                                                config=CONFIG)
                            answer = rsp.text.strip()
                            finish_reason = (rsp.candidates[0].finish_reason.name
                                if rsp.candidates and rsp.candidates[0].finish_reason else "UNKNOWN")
                        except Exception as e:
                            answer, finish_reason = f"ERROR: {e}", "Failed (API)"
                            
                        chat.extend([
                            user_msg,
                            types.Content(role="model",
                                        parts=[types.Part.from_text(text=answer or "…")])
                        ])
                    
                    # Save chat history
                    
                    saved_path = os.path.join(ANSWERS_DIR, f"{video_id}.json")
                    os.makedirs(os.path.dirname(saved_path), exist_ok=True)
                    with open(saved_path, "w") as f:
                        json.dump(serialize_chat(chat), f, indent=2)
                    status = "Success"
                except Exception as e:
                    finish_reason = "Failed (API)"
                    print(f"Error in response: {e}")
                    status = "Blocked/Empty"
                    
                result = {"video_id": video_id, "questions": questions_list, "duration": time.time()-start_time, "finish_reason": finish_reason, "status": status}
                await results_queue.put(result)
                logger.debug(f"QID {video_id} (Async): Attempt {attempt + 1} {status} ({time.time()-api_start:.2f}s API / {time.time()-start_time:.2f}s Total). Result queued.")
                return # Exit function after success


            except Exception as e:
                 result = {"video_id": video_id, "questions": questions_list, "duration": time.time()-start_time, "finish_reason": finish_reason, "status": "Failed (Unexpected Error)"}
                 await results_queue.put(result)
                 return

        # Fallback (Should not be reached if logic above is correct)
        logger.error(f"VideoID {video_id} (Async): Exited retry loop unexpectedly.")
        result = {"video_id": video_id, "questions": questions_list, "duration": time.time()-start_time, "finish_reason": "UNKNOWN", "status": "Failed (Unknown)"}
        await results_queue.put(result)

async def results_writer_task(
    queue: asyncio.Queue,
    filename: str,
    write_batch_size: int = 20, # How many results to buffer before writing
    write_interval_sec: float = 10.0 # Max time between writes
):
    """Gets results from queue and writes them to CSV in batches."""
    results_buffer = []
    last_write_time = time.monotonic()
    # Define expected header based on the dict keys put in the queue
    fieldnames = ["video_id", "questions", "status", "duration_sec", "finish_reason"]
    file_exists = Path(filename).is_file()

    logger.info(f"Writer task started. Writing results to {filename}")

    while True:
        try:
            # Wait for an item with a timeout
            result = await asyncio.wait_for(queue.get(), timeout=write_interval_sec)

            if result is None: # Signal to terminate
                logger.info("Writer task received termination signal.")
                break

            if isinstance(result, dict):
                 # Ensure duration is rounded here before adding to buffer
                 result["duration_sec"] = round(result.get("duration", -1), 2)
                 # Remove the original 'duration' key if desired
                 result.pop('duration', None)
                 results_buffer.append(result)
            else:
                logger.warning(f"Writer task received non-dict item: {result}")

            queue.task_done() # Signal that the item was processed

        except asyncio.TimeoutError:
            # Timeout occurred, write buffer if not empty, even if batch size not reached
            logger.debug("Writer task timeout reached.")
            pass # Continue to buffer check below

        except Exception as e:
            logger.error(f"Writer task encountered error getting from queue: {e}", exc_info=True)
            # Decide if this is fatal or if we should try to continue
            await asyncio.sleep(1) # Prevent fast spinning on error
            continue # Try to continue processing

        # Check if we should write the buffer
        buffer_size = len(results_buffer)
        time_since_last_write = time.monotonic() - last_write_time
        should_write = (
             buffer_size > 0 and
             (buffer_size >= write_batch_size or time_since_last_write >= write_interval_sec)
        )

        if should_write:
            logger.info(f"Writing batch of {buffer_size} results to {filename}...")
            try:
                # Use 'with open' for proper handling
                with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                    # Write header only if file didn't exist at the start
                    if not file_exists:
                        writer.writeheader()
                        file_exists = True # Prevent writing header again
                    writer.writerows(results_buffer)

                results_buffer = [] # Clear buffer after successful write
                last_write_time = time.monotonic()
                logger.info(f"Batch written successfully.")
            except IOError as e:
                logger.error(f"IOError writing results batch to {filename}: {e}. Results may be lost.")
                # Optional: Add retry logic here, or store failed batch elsewhere
            except Exception as e:
                logger.error(f"Unexpected error writing results batch: {e}", exc_info=True)


    # --- Cleanup: Write any remaining items after receiving None signal ---
    if results_buffer:
        logger.info(f"Writing final remaining {len(results_buffer)} results...")
        try:
            with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                if not file_exists: writer.writeheader() # Check again in case file was deleted mid-run
                writer.writerows(results_buffer)
            logger.info("Final results written.")
        except Exception as e:
            logger.error(f"Error writing final results batch: {e}")

    logger.info("Writer task finished.")

# --- Modified Main Execution Function with UI ---
async def run_bulk_inference_async():
    """Runs the bulk inference process asynchronously with queue writer, rate limiting, and UI."""

    # --- Standard Setup ---
    if ai_client is None: logger.error("AI Client not initialized."); display(Markdown("❌ AI Client not initialized.")); return

    # --- Load Data and Prepare Tasks ---
    logger.info("Loading metadata for inference...")
    
    # Load video questions for inference as a dictionaies of dicts
    # Each dict contains video_id, gcs_uri, file_api_name, and other metadata
    video_questions_for_inference = load_metadata_questions_generation(METADATA_FILE)
    print(f"Loaded {len(video_questions_for_inference)} video questions for inference.")
    
    if not video_questions_for_inference: logger.warning("No video data ready."); return
    processed_qids = load_processed_video_ids(ANSWERS_CHECK_DIR)

    inference_tasks_input = []
    tasks_skipped = 0
    required_col = 'gcs_uri' if USE_VERTEX else 'file_api_name'
    for video_id, video_info in video_questions_for_inference.items():
        # Check if video_id is correct:
        if video_id == video_info.get('video_id'):
            logger.debug(f"Video ID {video_id} matches.")
        else:
            logger.warning(f"Video ID mismatch: {video_id} != {video_info.get('video_id')}.")
            continue
        
        if not video_id or video_id in processed_qids:
            tasks_skipped += 1; continue
        inference_tasks_input.append(video_info)

    total_tasks = len(inference_tasks_input)
    logger.info(f"Prepared {total_tasks} new inference tasks. Skipped {tasks_skipped}." )
    if total_tasks == 0: logger.info("No new questions to process."); return

    # --- Setup Rate Limiter ---
    rate_limiter = None
    if REQUESTS_PER_MINUTE is not None and REQUESTS_PER_MINUTE > 0:
        capacity = 10 # Example: Keep burst capacity moderate
        rate_limiter = AsyncRateLimiter(rate=REQUESTS_PER_MINUTE, period=60.0, capacity=capacity)
        logger.info(f"Rate limiting enabled: {REQUESTS_PER_MINUTE} RPM, Capacity: {capacity}")
    else:
        logger.info("Rate limiting disabled.")

    # --- Create Queue and Start Writer Task ---
    results_queue = asyncio.Queue()
    writer_handle = asyncio.create_task(
        results_writer_task(results_queue, ANSWERS_CHECK_DIR)
    )

    # --- Schedule and Run Inference Tasks with Progress Bar ---
    logger.info(f"Starting async inference for {total_tasks} questions (Concurrency: {MAX_ASYNC_WORKERS})...")
    semaphore = asyncio.Semaphore(MAX_ASYNC_WORKERS)
    start_bulk_time = time.time()

    # Create coroutines
    inference_coroutines = [
        perform_inference_single_async(q_info, ai_client, semaphore, rate_limiter, results_queue)
        for q_info in inference_tasks_input
    ]

    # --- Use asyncio.as_completed with tqdm_notebook for live progress ---
    completed_count = 0
    gather_exception = None
    try:
        # Create a future for each coroutine to track completion
        tasks = [asyncio.ensure_future(coro) for coro in inference_coroutines]
        # Use tqdm with as_completed
        for future in tqdm(asyncio.as_completed(tasks), total=total_tasks, desc="Async Inference"):
            try:
                await future # Wait for the next task to complete, raise exception if task failed
                completed_count += 1
            except Exception as task_exc:
                 # Log error from individual task if it wasn't caught inside
                 logger.error(f"Error surfaced from an inference task: {task_exc}", exc_info=False)
                 # Optionally decide if you want to stop processing other tasks
                 # gather_exception = task_exc # Store first exception
                 # break # Or continue processing others
    except Exception as outer_exc:
        # Catch errors during the setup/iteration of as_completed itself
        logger.error(f"Error during as_completed processing: {outer_exc}", exc_info=True)
        gather_exception = outer_exc

    # Ensure all tasks are awaited even if we broke early due to an error in one task
    # (This might be redundant if as_completed handles cancellation correctly, but safer)
    await asyncio.gather(*tasks, return_exceptions=True)

    bulk_duration = time.time() - start_bulk_time
    logger.info(f"Async inference task processing finished in {bulk_duration:.2f} seconds. Completed: {completed_count}/{total_tasks}.")
    if gather_exception:
         logger.error(f"Bulk inference encountered errors: {gather_exception}")

    # --- Signal Writer Task to Finish (ALWAYS DO THIS) ---
    logger.info("Signaling writer task to complete...")
    await results_queue.put(None)

    # --- Wait for Writer Task to Finish (ALWAYS DO THIS) ---
    logger.info("Waiting for writer task to finish writing remaining results...")
    try:
        await writer_handle # Wait until the writer processes the None signal and exits
        logger.info("Writer task has finished.")
    except Exception as writer_exc:
        logger.error(f"Error waiting for writer task: {writer_exc}", exc_info=True)

    # --- Final Summary ---
    logger.info(f"Bulk inference process complete. See logs and {ANSWERS_CHECK_DIR}.")
    # Remove handler to prevent duplicate logs on re-run

# --- Run the async function ---
asyncio.run(run_bulk_inference_async())

2025-05-09 21:29:26,702 - INFO - Loading metadata for inference...
2025-05-09 21:29:26,724 - INFO - Loaded 289 unique videos for inference.
Loaded 289 video questions for inference.
2025-05-09 21:29:26,726 - INFO - Loaded 61 processed video IDs from generated_questions/gemini-2.0-flash/answers.csv
2025-05-09 21:29:26,726 - INFO - Prepared 228 new inference tasks. Skipped 61.
2025-05-09 21:29:26,727 - INFO - Rate limiting enabled: 7 RPM, Capacity: 10
2025-05-09 21:29:26,727 - INFO - Starting async inference for 228 questions (Concurrency: 4)...


Async Inference:   0%|          | 0/228 [00:00<?, ?it/s]

2025-05-09 21:29:26,735 - INFO - Writer task started. Writing results to generated_questions/gemini-2.0-flash/answers.csv


2025-05-09 21:29:26,739 - INFO - QID Se8JkJUt7YE (Async): Already processed. Skipping.
2025-05-09 21:29:26,739 - INFO - QID RBz8pTO0ySw (Async): Already processed. Skipping.
2025-05-09 21:29:26,740 - INFO - QID fyq0pd_pFvE (Async): Already processed. Skipping.
2025-05-09 21:29:26,741 - INFO - QID FkNdEfxYYc4 (Async): Already processed. Skipping.
2025-05-09 21:29:26,741 - INFO - QID SdvMPe6XF7w (Async): Already processed. Skipping.
2025-05-09 21:29:26,742 - INFO - QID rujdEa3MVYo (Async): Already processed. Skipping.
2025-05-09 21:29:26,742 - INFO - QID Ai4fCLMGL9Y (Async): Already processed. Skipping.
2025-05-09 21:29:26,742 - INFO - QID Z5jKO_Qql30 (Async): Already processed. Skipping.
2025-05-09 21:29:26,743 - INFO - QID yzlFm0LpRpM (Async): Already processed. Skipping.
2025-05-09 21:29:26,744 - INFO - QID m8tfdmm3g2A (Async): Already processed. Skipping.
2025-05-09 21:29:26,744 - INFO - QID HsXS1Qt11cU (Async): Already processed. Skipping.
2025-05-09 21:29:26,745 - INFO - QID lAmfy5

In [16]:
# Calucalte the numbers of file inside ANSWERS_DIR
import os
num_files = len([f for f in os.listdir(ANSWERS_DIR) if os.path.isfile(os.path.join(ANSWERS_DIR, f))])

print(f"Number of files in {ANSWERS_DIR}: {num_files}")

Number of files in generated_questions/gemini-2.0-flash/chat_history: 79


In [18]:
len(os.listdir(ANSWERS_DIR))

79

In [None]:
if f"{video_id}.json" in :
    logger.info(f"QID {video_id} (Async): Already processed. Skipping.")
    result = {"video_id": video_id, "questions": questions_list, "duration": 0, "finish_reason": "N/A", "status": "Already Processed"}
    await results_queue.put(result)
    return

In [None]:
interim_cleanup_and_report(ANSWERS_CHECK_DIR)

2025-05-09 21:06:41,182 - INFO - Loaded results from generated_questions/gemini-2.0-flash/answers.csv (1 rows).


### Results Summary: 1 Success, 0 Failed.

2025-05-09 21:06:41,187 - INFO - Removed duplicates, remaining 1 unique QIDs.
