# AI Video Search with Snowflake and Twelve Labs

### Overview

This notebook demonstrates how to build an end‑to‑end AI video search workflow using Snowflake, Twelve Labs, and Streamlit.  
It covers creating video embeddings, storing and querying them in Snowflake, and surfacing results in an interactive UI.

## Architecture

- Snowflake for data storage, vector search, and orchestration.
- Twelve Labs for multimodal video and text embeddings.
- Streamlit for an interactive, browser‑based search experience.


## Environment configuration

The following cells configure the notebook environment for running the AI video search demo:

- Install Python dependencies required for Twelve Labs, video processing (`moviepy`, `ffmpeg`), and the Streamlit UI.
- Set core configuration parameters such as the Twelve Labs API key placeholder, chosen model, Snowflake database and schema names.
- Define the base table names used to store video segment embeddings, text query embeddings, video source URLs, and frame‑level results.
- Configure stage names and fully qualified identifiers that determine where packages, UDFs, and extracted video frames are stored in Snowflake.

Update these configuration values to match your own Snowflake account and Twelve Labs setup before running the notebook.

In [None]:
USE ROLE ACCOUNTADMIN;

USE DATABASE MY_DB;
USE SCHEMA MY_SCHEMA;
CREATE OR REPLACE SECRET twelve_labs_api
 TYPE = GENERIC_STRING
 SECRET_STRING = 'YOUR_TWELVE_LABS_API_KEY';

In [None]:
use role VIDEO_CONTAINER_RUNTIME_ROLE;

In [None]:
!pip install twelvelabs moviepy
!DEBIAN_FRONTEND=noninteractive apt-get install -y ffmpeg

# Note: The Twelve Labs SDK is provided as a ZIP in a Snowflake stage and loaded further down

In [None]:
# TwelveLabs API key
TWELVE_LABS_API_KEY = "YOUR_TWELVE_LABS_API_KEY"

# TwelveLabs Model
CHOSEN_MODEL = "Marengo-retrieval-2.7"

# Env
DB_NAME = "MY_DB"
SCHEMA_NAME = "MY_SCHEMA"

# Core tables
VIDEO_SEGMENT_EMBEDDINGS_TABLE   = "VIDEO_SEGMENT_EMBEDDINGS"
TEXT_QUERY_EMBEDDINGS_TABLE      = "TEXT_QUERY_EMBEDDINGS"
VIDEO_SOURCE_URLS_TABLE          = "VIDEO_SOURCE_URLS"
VIDEO_FRAME_RESULTS_TABLE        = "VIDEO_FRAME_RESULTS"

# Stage names 
PKG_STAGE_NAME  = "TRAFFIC_PACKAGES"
UDFS_STAGE_NAME = "TRAFFIC_UDFS"
VIDEO_FRAME_STAGE_NAME     = "STAGE_VIDEO_FRAMES"

# Fully qualified identifiers
PKG_STAGE_FQN  = f'"{DB_NAME}"."{SCHEMA_NAME}"."{PKG_STAGE_NAME}"'
UDFS_STAGE_FQN = f"{DB_NAME}.{SCHEMA_NAME}.{UDFS_STAGE_NAME}"
VIDEO_FRAME_STAGE_FQN     = f"{DB_NAME}.{SCHEMA_NAME}.{VIDEO_FRAME_STAGE_NAME}"

TEXT_QUERY_EMBEDDINGS_TABLE_FQN      = f"{DB_NAME}.{SCHEMA_NAME}.{TEXT_QUERY_EMBEDDINGS_TABLE}"
VIDEO_SOURCE_URLS_TABLE_FQN          = f'"{DB_NAME}"."{SCHEMA_NAME}"."{VIDEO_SOURCE_URLS_TABLE}"'
VIDEO_FRAME_RESULTS_TABLE_FQN        = f"{DB_NAME}.{SCHEMA_NAME}.{VIDEO_FRAME_RESULTS_TABLE}"
VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN = f"{DB_NAME}.{SCHEMA_NAME}.{VIDEO_SEGMENT_EMBEDDINGS_TABLE}"


# Classification categories - Update as required
ZERO_SHOT_CATEGORIES = [
    "articulated semi-trailer",
    "rigid box truck",
    "tanker truck",
    "flatbed truck",
    "tipper dump truck",
    "car carrier truck",
    "refrigerated truck",
    "concrete mixer truck",
    "sand tipper truck",
    "delivery van",
]



## Environment and library setup

This section imports all required Python libraries and initializes the Snowflake session and Twelve Labs client.

- `twelvelabs` SDK for creating video and text embeddings.
- `snowflake.snowpark` and `snowflake.cortex` for running queries and LLM calls inside Snowflake.
- `requests`, `tempfile`, `os`, and `io` for downloading and handling video files.
- `pandas` and `streamlit` for data manipulation and building the interactive UI.
- `PIL` and `moviepy` for extracting and working with video frames.

The call to `get_active_session()` retrieves the current Snowflake Snowpark session for this notebook, and `TwelveLabs(api_key=TWELVE_LABS_API_KEY)` constructs a client to interact with the Twelve Labs API.


In [None]:
from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask
from twelvelabs.models.task import Task

from snowflake.snowpark.context import get_active_session
from snowflake import cortex
import snowflake

import requests
import json
import io
import os
import tempfile
import contextlib

import pandas as pd
import streamlit as st
from PIL import Image as PILImage
from moviepy import VideoFileClip

session = get_active_session()
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

## Core helper functions

The next cells define helper functions that encapsulate the main steps of the video search workflow:

- `create_video_embeddings` is a Snowflake UDTF that calls Twelve Labs to generate and return segment‑level video embeddings for a given video URL.
- `create_and_store_text_query_embedding(...)` creates a text embedding for a query using Twelve Labs and stores it in Snowflake.
- `zero_shot_classify_sql(...)` uses Snowflake AI to classify a query into one of several vehicle or scene categories.
- `semantic_search(...)` runs a similarity search over precomputed video segment embeddings to find relevant segments for a query.
- `apply_llm_scoring_to_frames(...)` applies an LLM‑based scoring step to refine how well individual frames match the user’s query.
- `generate_video_embed(...)` creates and persists video segment embeddings for a given video URL if they do not already exist.
- `store_query_results_to_table(...)` writes semantic search results into a Snowflake table for later frame extraction and display.
- `extract_and_upload_frames(...)` downloads the source video, extracts representative frames for selected segments, and uploads them to a Snowflake stage.
- `orchestrate_video_search_and_frames(...)` ties together search, persistence, frame extraction, and LLM scoring into a single orchestration step.


In [None]:
TWELVE_LABS_API_KEY = "YOUR_TWELVE_LABS_API_KEY"  

from snowflake.snowpark.functions import udtf, lit, Tuple
from snowflake.snowpark.types import FloatType, StringType, StructType, StructField, Iterable, VectorType

session.clear_imports()

# Use parameterised package stage
session.add_import(f'@{PKG_STAGE_FQN}/twelvelabs.zip')

@udtf(
    name="create_video_embeddings",
    packages=['httpx', 'pydantic'],
    external_access_integrations=['twelvelabs_access_integration'],
    secrets={'cred': 'twelve_labs_api'},
    if_not_exists=True,
    is_permanent=True,
    # Use parameterised UDFs stage
    stage_location=f'@{UDFS_STAGE_FQN}',
    output_schema=StructType([
        StructField("embedding", VectorType(float, 1024)),
        StructField("start_offset_sec", FloatType()),
        StructField("end_offset_sec", FloatType()),
        StructField("embedding_scope", StringType())
    ])
)
class create_video_embeddings:
    def __init__(self):
        from twelvelabs import TwelveLabs
        from twelvelabs.models.embed import EmbeddingsTask
        import _snowflake

        twelve_labs_api_key = _snowflake.get_generic_secret_string('cred')
        twelvelabs_client = TwelveLabs(api_key=twelve_labs_api_key)
        self.twelvelabs_client = twelvelabs_client

    def process(self, video_url: str) -> Iterable[Tuple[list, float, float, str]]:
        task = self.twelvelabs_client.embed.task.create(
            model_name=CHOSEN_MODEL,
            video_url=video_url
        )

        status = task.wait_for_done(sleep_interval=60)

        task = task.retrieve()
        if task.video_embedding is not None and task.video_embedding.segments is not None:
            for segment in task.video_embedding.segments:
                yield (
                    segment.embeddings_float,
                    segment.start_offset_sec,
                    segment.end_offset_sec,
                    segment.embedding_scope,
                )


In [None]:
def create_and_store_text_query_embedding(text_query: str) -> int:
    """
    Create a text embedding for `text_query`, store it in QUERY_EMBEDDINGS and return the generated QUERY_ID.
    """

    # 1) Call Twelve Labs Embed API in text mode
    response = twelvelabs_client.embed.create(
        model_name=CHOSEN_MODEL,
        text=text_query,
        text_truncate="start"
    )

    if (
        response.text_embedding is None
        or response.text_embedding.segments is None
        or len(response.text_embedding.segments) == 0
    ):
        raise ValueError("No text embedding returned from Twelve Labs for query: " + text_query)

    embedding_vector = response.text_embedding.segments[0].embeddings_float  # list[float]

    # Build ARRAY literal for the vector
    embedding_literal = ",".join(str(x) for x in embedding_vector)

    # Escape single quotes in the query text for SQL
    escaped_text = text_query.replace("'", "''")

    # 2) Insert into the query-embeddings table
    insert_sql = f"""
        INSERT INTO {TEXT_QUERY_EMBEDDINGS_TABLE_FQN} (QUERY_TEXT, QUERY_EMBEDDING)
        SELECT
            '{escaped_text}' AS QUERY_TEXT,
            ARRAY_CONSTRUCT({embedding_literal})::VECTOR(FLOAT, 1024) AS QUERY_EMBEDDING
    """
    session.sql(insert_sql).collect()

    # 3) Fetch the latest QUERY_ID for this text
    select_sql = f"""
        SELECT QUERY_ID
        FROM {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
        WHERE QUERY_TEXT = '{escaped_text}'
        ORDER BY CREATED_AT DESC
        LIMIT 1
    """
    result_df = session.sql(select_sql).to_pandas()

    return int(result_df["QUERY_ID"].iloc[0])


In [None]:
def zero_shot_classify_sql(query_text: str, categories: list[str]) -> str:
    """
    Uses Snowflake AISQL AI_CLASSIFY via SQL and returns the top label
    (CATEGORY_ZERO_SHOT) for the given query_text and categories.
    """
    if query_text is None or str(query_text).strip() == "":
        return None

    # Build the categories array literal: ['a','b',...]
    cats_escaped = [c.replace("'", "''") for c in categories]
    cats_sql = ", ".join(f"'{c}'" for c in cats_escaped)
    cats_array_literal = f"[{cats_sql}]"

    # Escape the input text for SQL
    q_escaped = query_text.replace("'", "''")

    # SQL: AI_CLASSIFY(...):labels[0] as CATEGORY_ZERO_SHOT
    sql = f"""
        SELECT
            AI_CLASSIFY('{q_escaped}', {cats_array_literal}):labels[0] AS CATEGORY_ZERO_SHOT
    """

    df = session.sql(sql)
    rows = df.collect()
    if not rows:
        return None

    # Row field name is CATEGORY_ZERO_SHOT
    return rows[0]["CATEGORY_ZERO_SHOT"]


In [None]:
def semantic_search(questions: list[str]) -> dict:
    """
    Create query embedding if it doesn't exist,
    execute semantic search over video segments,
    and return results.
    """

    for q in questions:
        # Normalize and escape input query for SQL safety
        q_norm = q.lower()
        q_norm_escaped = q_norm.replace("'", "''")

        # Check if an existing embedding for the query already exists in the database
        existing_sql = f"""
        SELECT QUERY_ID, CATEGORY_ZERO_SHOT
        FROM {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
        WHERE LOWER(QUERY_TEXT) = '{q_norm_escaped}'
        ORDER BY CREATED_AT DESC
        LIMIT 1
        """
        existing_df = session.sql(existing_sql).to_pandas()

        if not existing_df.empty:
            # Use the most recent embedding and classification if found
            query_id = int(existing_df["QUERY_ID"].iloc[0])
            category_zero_shot = existing_df["CATEGORY_ZERO_SHOT"].iloc[0]
        else:
            # Create new query embedding if one doesn't exist
            query_id = create_and_store_text_query_embedding(q)

            # Perform zero-shot classification on the query text
            category_zero_shot = zero_shot_classify_sql(q, ZERO_SHOT_CATEGORIES)

            if category_zero_shot is not None:
                # Update embedding table to include the zero-shot category
                cat_escaped_for_update = category_zero_shot.replace("'", "''")
                update_sql = f"""
                UPDATE {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
                SET CATEGORY_ZERO_SHOT = '{cat_escaped_for_update}'
                WHERE QUERY_ID = {query_id}
                """
                session.sql(update_sql).collect()

        # Build semantic similarity search query between query embedding and video segments
        sql = f"""
        WITH selected_query AS (
            SELECT QUERY_EMBEDDING
            FROM {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
            WHERE QUERY_ID = {query_id}
        ),
        base_sim AS (
            SELECT
                v.URL              AS VIDEO_URL,
                v.START_OFFSET_SEC AS START_OFFSET_SEC,
                v.END_OFFSET_SEC   AS END_OFFSET_SEC,
                v.EMBEDDING        AS VIDEO_EMBEDDING,
                ROUND(
                    VECTOR_COSINE_SIMILARITY(
                        v.EMBEDDING::VECTOR(FLOAT, 1024),
                        q.QUERY_EMBEDDING::VECTOR(FLOAT, 1024)
                    ),
                    4
                ) AS SIMILARITY_SCORE
            FROM {VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN} v
            CROSS JOIN selected_query q
        )
        SELECT
            VIDEO_URL,
            START_OFFSET_SEC,
            END_OFFSET_SEC,
            SIMILARITY_SCORE,
            0.0              AS LLM_SCORE,
            SIMILARITY_SCORE AS FINAL_SCORE
        FROM base_sim
        ORDER BY FINAL_SCORE DESC
        """
        # Execute SQL search and fetch results into a dataframe
        full_df = session.sql(sql).to_pandas()

        # Sort results by final semantic similarity score in descending order
        result_df = full_df.sort_values(by="FINAL_SCORE", ascending=False).copy()

        # Return structured search results for the query
        return {
            "query_id": query_id,
            "query_text": q,
            "category_zero_shot": category_zero_shot,
            "results_df": result_df,
        }


In [None]:
def apply_llm_scoring_to_frames(
    query_id: int,
    user_query: str,
    video_frame_results_fqn: str,
    frames_stage_name: str,
    db_name: str,
    schema_name: str,
    session,
) -> None:
    """
    For a given QUERY_ID, call Snowflake Cortex on each frame in VIDEO_FRAME_RESULTS
    and update LLM_SCORE and FINAL_SCORE in-place.
    """
    # Escape single quotes in user query
    user_query_escaped = user_query.replace("'", "''")
    stage_string = f"@{db_name}.{schema_name}.{frames_stage_name}"

    # 1) Compute LLM scores into a temporary table
    sql_scored = f"""
        CREATE OR REPLACE TEMP TABLE LLM_FRAME_SCORES AS
        SELECT
            v.RESULT_ID,
            v.QUERY_ID,
            v.FRAME_STAGE_PATH,
            v.SIMILARITY_SCORE,
            v.FINAL_SCORE,
            SNOWFLAKE.CORTEX.COMPLETE(
                'claude-4-sonnet',
                PROMPT(
                    'You are a traffic analysis assistant. Answer based only on the provided image {{0}}. ' ||
                    'The image provided is a screenshot of highway traffic footage. ' ||
                    'Does the image provided contain: {user_query_escaped}? ' ||
                    'Return a single decimal number between 0.0 (no confidence) and 1.0 (absolute confidence). ' ||
                    'Provide only the decimal number, no other text.',
                    TO_FILE('{stage_string}', v.FRAME_STAGE_PATH)
                )
            )::FLOAT AS LLM_RAW
        FROM {video_frame_results_fqn} v
        WHERE v.QUERY_ID = {query_id}
          AND v.FRAME_STAGE_PATH IS NOT NULL
    """
    session.sql(sql_scored).collect()

    # 2) Update the main table from the temp table
    sql_update = f"""
        UPDATE {video_frame_results_fqn} t
        SET
            LLM_SCORE   = s.LLM_RAW,
            FINAL_SCORE = t.SIMILARITY_SCORE * s.LLM_RAW
        FROM LLM_FRAME_SCORES s
        WHERE t.RESULT_ID = s.RESULT_ID
          AND t.QUERY_ID  = {query_id}
    """
    session.sql(sql_update).collect()


In [None]:
def generate_video_embed(
    video_url: str,
    session,
    video_segment_embeddings_fqn: str,
    video_source_urls_table_fqn: str,
) -> None:
    """
    Create embeddings for a single video URL and persist to VIDEO_SEGMENT_EMBEDDINGS.
    Also marks the video as processed in VIDEO_SOURCE_URLS.
    Skips work if embeddings already exist for this URL.
    """


    escaped_url = video_url.replace("'", "''")
    existing_sql = f"""
        SELECT COUNT(*) AS CNT
        FROM {video_segment_embeddings_fqn}
        WHERE URL = '{escaped_url}'
    """
    existing_df = session.sql(existing_sql).to_pandas()
    existing_cnt = int(existing_df["CNT"].iloc[0]) if not existing_df.empty else 0

    if existing_cnt > 0:
        print(f"Embeddings already exist for URL = {video_url}, skipping generation.")
        # ensure PROCESSED flag is true as well
        update_sql = f"""
            UPDATE {video_source_urls_table_fqn}
            SET PROCESSED = TRUE,
                PROCESSED_AT = COALESCE(PROCESSED_AT, CURRENT_TIMESTAMP())
            WHERE URL = '{escaped_url}'
        """
        session.sql(update_sql).collect()
        return


    # Build single-row dataframe for this URL
    df = session.create_dataframe([[video_url]], schema=["url"])

    # Generate embeddings via UDTF
    df_with_embeddings = df.join_table_function(
        create_video_embeddings(df["url"]).over(partition_by="url")
    )

    # Persist into the fully-qualified VIDEO_SEGMENT_EMBEDDINGS table (append mode)
    df_with_embeddings.write.mode("append").save_as_table(
        video_segment_embeddings_fqn
    )

    # Mark as processed in VIDEO_SOURCE_URLS
    update_sql = f"""
        UPDATE {video_source_urls_table_fqn}
        SET PROCESSED = TRUE,
            PROCESSED_AT = CURRENT_TIMESTAMP()
        WHERE URL = '{escaped_url}'
    """
    session.sql(update_sql).collect()


In [None]:
def store_query_results_to_table(
    query_result: dict,
    video_url: str,
    query_result_frames_fqn: str,
    session,
) -> None:
    """
    Persist segments from a semantic search result into VIDEO_FRAME_RESULTS.
    """

    query_id = int(query_result["query_id"])
    results_df = query_result["results_df"]

    if results_df.empty:
        print(f"No results for query ID {query_id}")
        return

    existing_sql = f"""
    SELECT COUNT(*) AS CNT
    FROM {query_result_frames_fqn}
    WHERE QUERY_ID = {query_id}
    """
    existing_df = session.sql(existing_sql).to_pandas()
    existing_cnt = int(existing_df["CNT"].iloc[0]) if not existing_df.empty else 0
    
    if existing_cnt > 0:
        print(
            f"Rows already exist in VIDEO_FRAME_RESULTS for QUERY_ID {query_id}, "
            "skipping insert."
        )
        return

    values_clauses = []
    for _, row in results_df.iterrows():
        qid = query_id
        vurl = video_url.replace("'", "''")
        start_sec = float(row["START_OFFSET_SEC"])
        end_sec = float(row["END_OFFSET_SEC"])
        sim = float(row["SIMILARITY_SCORE"])
        llm_score = 0.0  # placeholder, will be updated by Cortex later
        final_score = float(row["FINAL_SCORE"])
        frame_path = "placeholder"

        values_clauses.append(
            f"({qid}, '{vurl}', {start_sec}, {end_sec}, {sim}, "
            f"{llm_score}, '{frame_path}', {final_score})"
        )

    if not values_clauses:
        print(f"No rows to insert for query ID {query_id}")
        return

    values_sql = ",".join(values_clauses)
    insert_sql = f"""
    INSERT INTO {query_result_frames_fqn}
        (QUERY_ID, VIDEO_URL, START_OFFSET_SEC, END_OFFSET_SEC,
         SIMILARITY_SCORE, LLM_SCORE, FRAME_STAGE_PATH, FINAL_SCORE)
    VALUES {values_sql}
    """
    session.sql(insert_sql).collect()
    print(
        f"{len(values_clauses)} results stored in VIDEO_FRAME_RESULTS "
        f"for Query ID {query_id}"
    )


In [None]:
def extract_and_upload_frames(
    video_url: str,
    segments_df,
    stage_name: str,
    db_name: str,
    schema_name: str,
    query_result_frames_fqn: str,
    query_id: int,
    session,
) -> dict:
    frame_map = {}

    # Guard: skip if frames already exist
    existing_sql = f"""
        SELECT COUNT(*) AS CNT
        FROM {query_result_frames_fqn}
        WHERE QUERY_ID = {query_id}
          AND FRAME_STAGE_PATH IS NOT NULL
          AND FRAME_STAGE_PATH <> ''
          AND FRAME_STAGE_PATH <> 'placeholder'
    """
    existing_df = session.sql(existing_sql).to_pandas()
    existing_cnt = int(existing_df["CNT"].iloc[0]) if not existing_df.empty else 0
    if existing_cnt > 0:
        print(f"Frames already exist for QUERY_ID {query_id}, skipping extraction.")
        return frame_map

    # Download the full video once to a temp file
    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
        response = requests.get(video_url, stream=True, timeout=60)
        response.raise_for_status()
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                tmp.write(chunk)
        tmp_path = tmp.name

    try:
        clip = VideoFileClip(tmp_path)

        for _, row in segments_df.iterrows():
            start_offset = float(row["START_OFFSET_SEC"])
            end_offset = float(row["END_OFFSET_SEC"])
            result_id = int(row["RESULT_ID"])
            frame_timestamp = start_offset + (end_offset - start_offset) / 2.0

            try:
                frame_array = clip.get_frame(frame_timestamp)
                img = PILImage.fromarray(frame_array.astype("uint8"))

                with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img_tmp:
                    img.save(img_tmp, format="JPEG", quality=90)
                    img_tmp_path = img_tmp.name

                filename = f"frame_{int(start_offset)}s.jpg"

                put_result = session.file.put(
                    img_tmp_path,
                    f"@{db_name}.{schema_name}.{stage_name}",
                    overwrite=True,
                    auto_compress=False
                )
                os.remove(img_tmp_path)

                relative_path = put_result[0].target

                update_sql = f"""
                    UPDATE {query_result_frames_fqn}
                    SET FRAME_STAGE_PATH = '{relative_path}'
                    WHERE RESULT_ID = {result_id}
                """
                session.sql(update_sql).collect()

                frame_map[start_offset] = relative_path
                print(f"Uploaded frame at {start_offset}s to stage path {relative_path}")

            except Exception as e:
                print(f"Error processing frame at {start_offset}s: {e}")

    finally:
        clip.close()
        os.remove(tmp_path)

    return frame_map


In [None]:
def orchestrate_video_search_and_frames(
    question_text: str,
    video_url: str,
    query_result_frames_fqn: str,
    db_name: str,
    schema_name: str,
    frames_stage_name: str,
    session,
) -> dict:
    """
    Orchestrate semantic search, result persistence, frame extraction, and LLM scoring
    for a single question.
    """

    # 1. Run semantic query over video segments
    result = semantic_search(
        questions=[question_text],
    )
    if result is None or result["results_df"].empty:
        print("No segments returned from semantic_search.")
        return {"queryresult": result, "framemap": {}, "segmentsdf": None}

    query_id = int(result["query_id"])

    # 2. Persist segments into VIDEO_FRAME_RESULTS if not already there
    store_query_results_to_table(
        query_result=result,
        video_url=video_url,
        query_result_frames_fqn=query_result_frames_fqn,
        session=session,
    )

    # 3. Reload VIDEO_FRAME_RESULTS rows for this QUERY_ID to get RESULT_IDs
    segments_df = (
        session.table(query_result_frames_fqn)
        .filter(f"QUERY_ID = {query_id}")
        .to_pandas()
    )

    # 4. Extract and upload frames only if frames for this QUERY_ID do not already exist
    frame_map = extract_and_upload_frames(
        video_url=video_url,
        segments_df=segments_df,
        stage_name=frames_stage_name,
        db_name=db_name,
        schema_name=schema_name,
        query_result_frames_fqn=query_result_frames_fqn,
        query_id=query_id,
        session=session,
    )

    # 5. Apply LLM scoring to frames for this query
    apply_llm_scoring_to_frames(
        query_id=query_id,
        user_query=question_text,
        video_frame_results_fqn=query_result_frames_fqn,
        frames_stage_name=frames_stage_name,
        db_name=db_name,
        schema_name=schema_name,
        session=session,
    )

    return {
        "query_result": result,
        "frame_map": frame_map,
        "segments_df": segments_df,
    }



## Streamlit application

The following cell defines a simple Streamlit app that lets you interactively search across prepared highway videos.  
It reads available video URLs from Snowflake, allows you to prepare a selected video by generating embeddings, and then runs semantic search to display the top‑matching frames for a natural‑language query.


In [None]:
# Read all videos, their processed status, and titles
video_urls_df = session.sql(
    f"""
    SELECT
        URL,
        TITLE,
        PROCESSED
    FROM {VIDEO_SOURCE_URLS_TABLE_FQN}
    """
).to_pandas()

# Handle case where there are no videos configured
if video_urls_df.empty:
    st.warning("No video URLs found in VIDEO_SOURCE_URLS.")
    selected_url = None

else:
    # Build labels with ✅ / ❌ and human-readable title
    def make_label(row):
        status_icon = "✅" if bool(row.get("PROCESSED", False)) else "❌"
        title = row.get("TITLE") or row["URL"]
        return f"{status_icon} {title}"

    # Build dropdown options
    options = video_urls_df.apply(make_label, axis=1).tolist()

    # Map label back to URL
    label_to_url = {
        options[i]: video_urls_df["URL"].iloc[i]
        for i in range(len(options))
    }

    selected_label = st.selectbox(
        "Choose a video to search in:",
        options=options,
    )
    selected_url = label_to_url[selected_label]

# App title and query input
st.title("Highway video search")

user_query = st.text_input(
    "Describe the vehicle or scene you are looking for:",
    value="Medium rigid flatbed tipper truck carrying sand",
)

# Guard when no videos are available
if selected_url is None:
    st.info("Add at least one video URL to VIDEO_SOURCE_URLS to begin.")

else:
    # Look up processed flag
    selected_row = video_urls_df[
        video_urls_df["URL"] == selected_url
    ].iloc[0]

    is_processed = bool(selected_row.get("PROCESSED", False))

    if not is_processed:
        st.warning("This video has not been prepared yet.")
    else:
        st.success("This video is prepared and ready for search.")

    # Prepare video button
    if not is_processed and st.button("Prepare selected video"):
        st.write("Preparing video (creating embeddings)…")

        generate_video_embed(
            video_url=selected_url,
            session=session,
            video_segment_embeddings_fqn=VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN,
            video_source_urls_table_fqn=VIDEO_SOURCE_URLS_TABLE_FQN,
        )

        st.success("Video preparation completed. Refresh the page to update status.")

    # Search logic
    if is_processed and st.button("Search video segments") and user_query.strip():
        st.write("Running semantic search…")

        orchestration = orchestrate_video_search_and_frames(
            question_text=user_query,
            video_url=selected_url,
            query_result_frames_fqn=VIDEO_FRAME_RESULTS_TABLE_FQN,
            db_name=DB_NAME,
            schema_name=SCHEMA_NAME,
            frames_stage_name=VIDEO_FRAME_STAGE_NAME,
            session=session,
        )

        result = orchestration.get("query_result")

        if result is None or result["results_df"].empty:
            st.info("No segments found for this query.")

        else:
            query_id = int(result["query_id"])

            frames_df = (
                session.table(VIDEO_FRAME_RESULTS_TABLE_FQN)
                .filter(f"QUERY_ID = {query_id}")
                .to_pandas()
            )

            if frames_df.empty:
                st.info("No frame results found in VIDEO_FRAME_RESULTS for this query.")

            else:
                frames_df = (
                    frames_df
                    .sort_values(by="FINAL_SCORE", ascending=False)
                    .head(6)
                )

                st.subheader("Top matching frames")

                num_cols = 2
                num_rows = 3

                for row_idx in range(num_rows):
                    cols = st.columns(num_cols)

                    for col_idx in range(num_cols):
                        idx = row_idx * num_cols + col_idx
                        if idx >= len(frames_df):
                            break

                        row = frames_df.iloc[idx]

                        frame_path = row["FRAME_STAGE_PATH"]
                        start_offset = row["START_OFFSET_SEC"]
                        end_offset = row["END_OFFSET_SEC"]

                        full_stage_path = f"@{DB_NAME}.{SCHEMA_NAME}.{VIDEO_FRAME_STAGE_NAME}/{frame_path}"
                        
                        get_res = session.file.get(
                            full_stage_path,    
                            "/tmp"             # local directory in the container
                        )
                        
                        local_filename = get_res[0].file
                        local_filepath = f"/tmp/{local_filename}"
                        
                        with cols[col_idx]:
                            st.image(
                                local_filepath,
                                caption=f"Segment {start_offset:.1f}s–{end_offset:.1f}s",
                                use_column_width=True,
                            )



## Testing and diagnostics

The next cells provide two high‑level tests to validate the pipeline.  
The first test re‑scores existing video segments for a fixed query using only stored embeddings, and the second test inspects existing frame‑level scores for the same query to help you verify that frame extraction and LLM scoring behaved as expected.

In [None]:
# One-off batch job to generate video segment embeddings

# Build a dataframe of video URLs to process
df = session.create_dataframe(video_urls, schema=['url'])

# Invoke the UDTF to create embeddings for each URL (one row per segment)
df = df.join_table_function(
    create_video_embeddings(df['url']).over(partition_by="url")
)

# Persist all generated segment embeddings into the target table
df.write.mode('overwrite').save_as_table(
    VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN
)

# Read back the persisted embeddings for inspection
df = session.table(VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN)

df

In [None]:
# ---- TEST 1: Re-score existing video segments for a fixed query (no new embeddings) ----
# This cell:
#   * Uses the existing hardcoded question text
#   * Reuses an existing QUERY_ID + embedding from TEXT_QUERY_EMBEDDINGS_TABLE_FQN
#   * Runs cosine similarity against VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN
#   * DOES NOT call Twelve Labs or create any new embeddings

test_query = "Medium rigid flatbed tipper truck carrying sand"

# Normalise and escape for lookup
q_norm = test_query.lower()
q_norm_escaped = q_norm.replace("'", "''")

# 1. Look up an existing QUERY_ID ONLY – do not generate a new one
existing_sql = f"""
SELECT QUERY_ID
FROM {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
WHERE LOWER(QUERY_TEXT) = '{q_norm_escaped}'
ORDER BY CREATED_AT DESC
LIMIT 1
"""
existing_df = session.sql(existing_sql).to_pandas()

if existing_df.empty:
    raise RuntimeError(
        "No existing embedding found for the test query. "
        "Run the regular flow once to create the text embedding, "
        "then re-run this cell."
    )

query_id = int(existing_df["QUERY_ID"].iloc[0])
print(f"Using existing QUERY_ID = {query_id} for test query.")

# 2. Pure SQL semantic similarity against preexisting video segment embeddings
sql_test_segments = f"""
WITH selected_query AS (
    SELECT QUERY_EMBEDDING
    FROM {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
    WHERE QUERY_ID = {query_id}
),
base_sim AS (
    SELECT
        v.URL                 AS VIDEO_URL,
        v.START_OFFSET_SEC    AS START_OFFSET_SEC,
        v.END_OFFSET_SEC      AS END_OFFSET_SEC,
        v.EMBEDDING           AS VIDEO_EMBEDDING,
        ROUND(
            VECTOR_COSINE_SIMILARITY(
                v.EMBEDDING::VECTOR(FLOAT, 1024),
                q.QUERY_EMBEDDING::VECTOR(FLOAT, 1024)
            ),
            4
        ) AS SIMILARITY_SCORE,
        q.QUERY_EMBEDDING     AS QUERY_EMBEDDING
    FROM {VIDEO_SEGMENT_EMBEDDINGS_TABLE_FQN} v
    CROSS JOIN selected_query q
)
SELECT
    VIDEO_URL,
    START_OFFSET_SEC,
    END_OFFSET_SEC,
    SIMILARITY_SCORE,
    QUERY_EMBEDDING
FROM base_sim
ORDER BY SIMILARITY_SCORE DESC
"""


test_segments_df = session.sql(sql_test_segments).to_pandas()

# Optionally restrict to top N for inspection
top10_segments_df = test_segments_df.head(10).copy()

print("Top 10 segment matches for the fixed test query:")
top10_segments_df


In [None]:
# ---- TEST 2: Inspect existing frame-level scores for the same fixed query ----
# This cell:
#   * Uses the same hardcoded test_query
#   * Finds its existing QUERY_ID
#   * Reads all rows from VIDEO_FRAME_RESULTS_TABLE_FQN for that QUERY_ID
#   * DOES NOT:
#       - extract new frames
#       - call Snowflake Cortex
#       - modify any tables

test_query = "Medium rigid flatbed tipper truck carrying sand"

# Normalise and escape for lookup
q_norm = test_query.lower()
q_norm_escaped = q_norm.replace("'", "''")

# 1. Look up existing QUERY_ID only
existing_sql = f"""
SELECT QUERY_ID
FROM {TEXT_QUERY_EMBEDDINGS_TABLE_FQN}
WHERE LOWER(QUERY_TEXT) = '{q_norm_escaped}'
ORDER BY CREATED_AT DESC
LIMIT 1
"""
existing_df = session.sql(existing_sql).to_pandas()

if existing_df.empty:
    raise RuntimeError(
        "No existing embedding / QUERY_ID found for the test query. "
        "Run the main orchestration once to seed QUERY_ID + frames, "
        "then re-run this cell."
    )

query_id = int(existing_df["QUERY_ID"].iloc[0])
print(f"Using existing QUERY_ID = {query_id} for frame-level test.")

# 2. Read existing frame results for this QUERY_ID
frames_sql = f"""
SELECT
    RESULT_ID,
    VIDEO_URL,
    START_OFFSET_SEC,
    END_OFFSET_SEC,
    SIMILARITY_SCORE,
    LLM_SCORE,
    FINAL_SCORE
FROM {VIDEO_FRAME_RESULTS_TABLE_FQN}
WHERE QUERY_ID = {query_id}
ORDER BY FINAL_SCORE DESC
"""
frames_df = session.sql(frames_sql).to_pandas()

if frames_df.empty:
    raise RuntimeError(
        "No rows found in VIDEO_FRAME_RESULTS for this QUERY_ID. "
        "Run the orchestration once to persist frames + scores, "
        "then re-run this cell."
    )

# Optional: restrict to top N for quick inspection
top10_frames_df = frames_df.head(10).copy()

print("Top 10 frame-level results for the fixed test query:")
top10_frames_df


In [None]:
# --- Reset video embeddings and frame images (does NOT touch text query embeddings) ---

# 1) Delete all files from the video frame stage
#    This removes the JPEG files previously extracted for frames.
session.sql(
    f"REMOVE @{DB_NAME}.{SCHEMA_NAME}.{VIDEO_FRAME_STAGE_NAME}"
).collect()

# 2) Truncate the VIDEO_FRAME_RESULTS table
#    This removes frame-level results, including FRAME_STAGE_PATH and scores.
session.sql(f"TRUNCATE TABLE {VIDEO_FRAME_RESULTS_TABLE_FQN}").collect()

# 3) Truncate the VIDEO_SEGMENT_EMBEDDINGS table using the base name
#    Use the original base table name with DB/SCHEMA instead of the mis-assigned FQN variable.
session.sql(
    f"TRUNCATE TABLE {DB_NAME}.{SCHEMA_NAME}.VIDEO_SEGMENT_EMBEDDINGS"
).collect()

# 4) Reset the PROCESSED flag on all videos
#    This ensures the app knows they need to be prepared again.
session.sql(
    f"UPDATE {VIDEO_SOURCE_URLS_TABLE_FQN} "
    f"SET PROCESSED = FALSE, PROCESSED_AT = NULL"
).collect()
