# AI Video Search with Snowflake and TwelveLabs
### Multimodal Videos Processing App using Twelvelabs, Whisper, and Snowflake Cortex

For prerequisites and setup instructions, check out the [README](https://github.com/Snowflake-Labs/sfguide-ai-video-search-with-snowflake-and-twelveLabs/blob/main/README.md).

In [None]:
!pip install twelvelabs
!pip install git+https://github.com/openai/whisper.git ffmpeg-python moviepy
# !apt install ffmpeg -y
!DEBIAN_FRONTEND=noninteractive apt-get install -y ffmpeg

In [None]:
from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask
from snowflake.snowpark.context import get_active_session
from twelvelabs.models.task import Task
import requests
import streamlit as st
import pandas as pd
import snowflake
from snowflake import cortex

session = get_active_session()

In [None]:
video_urls = ['https://sfquickstarts.s3.us-west-1.amazonaws.com/misc/videos/snowflake_build2024_announcements.mp4']

In [None]:
from snowflake.snowpark.functions import udtf, lit, Tuple
from snowflake.snowpark.types import FloatType, StringType, StructType, StructField, Iterable, VectorType

session.clear_imports()
session.add_import('@"DASH_DB"."DASH_SCHEMA"."DASH_PKGS"/twelvelabs.zip')
@udtf(name="create_video_embeddings",
     packages=['httpx','pydantic'],
     external_access_integrations=['twelvelabs_access_integration'],
     secrets={'cred': 'twelve_labs_api'},
     if_not_exists=True,
     is_permanent=True,
     stage_location='@DASH_DB.DASH_SCHEMA.DASH_UDFS',
     output_schema=StructType([
        StructField("embedding", VectorType(float,1024)),
        StructField("start_offset_sec", FloatType()),
        StructField("end_offset_sec", FloatType()),
        StructField("embedding_scope", StringType())
    ])
    )
class create_video_embeddings:
    def __init__(self):
        from twelvelabs import TwelveLabs
        from twelvelabs.models.embed import EmbeddingsTask
        import _snowflake
        
        twelve_labs_api_key = _snowflake.get_generic_secret_string('cred') 
        twelvelabs_client = TwelveLabs(api_key=twelve_labs_api_key)
        self.twelvelabs_client = twelvelabs_client

    def process(self, video_url: str) -> Iterable[Tuple[list, float, float, str]]:
        # Create an embeddings task
        task = self.twelvelabs_client.embed.task.create(
            model_name="Marengo-retrieval-2.7",
            video_url=video_url
        )
        
        # Wait for the task to complete
        status = task.wait_for_done(sleep_interval=60)

        # Retrieve and process embeddings
        task = task.retrieve()
        if task.video_embedding is not None and task.video_embedding.segments is not None:
            for segment in task.video_embedding.segments:
                yield (
                    segment.embeddings_float,  # Embedding (list of floats)
                    segment.start_offset_sec,  # Start offset in seconds
                    segment.end_offset_sec,    # End offset in seconds
                    segment.embedding_scope,   # Embedding scope
                )


In [None]:
df = session.create_dataframe(video_urls,schema=['url'])
df = df.join_table_function(create_video_embeddings(df['url']).over(partition_by="url"))
df.write.mode('overwrite').save_as_table('video_embeddings')
df = session.table('video_embeddings')
df

In [None]:
import urllib.request
import os
from moviepy import VideoFileClip
import whisper
import warnings
import logging

# Suppress Whisper warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# Set MoviePy logger level to ERROR or CRITICAL to suppress INFO logs
logging.getLogger("moviepy").setLevel(logging.ERROR)

# Load the Whisper model once
whisper_model = whisper.load_model("base")  # or any other model you want to use, e.g., 'small', 'medium', 'large'

def download_video_from_s3(video_url, output_video_path, status):
    try:
        # status.caption("Downloading video file from S3...")
        urllib.request.urlretrieve(video_url, output_video_path)
        # status.caption(f"Video downloaded to {output_video_path}.")
        return output_video_path
    except Exception as e:
        status.caption(f"An error occurred during video download: {e}")
        return None

def extract_audio_from_video(video_path, output_audio_path, status, start_time=None, end_time=None):
    try:
        # status.caption("Extracting audio from video...")
        video_clip = VideoFileClip(video_path)
        
        # If start and end times are provided, trim the video
        if start_time is not None or end_time is not None:
            video_clip = video_clip.subclipped(start_time, end_time)
        
        video_clip.audio.write_audiofile(output_audio_path)
        # status.caption(f"Audio extracted to {output_audio_path}.")
        return output_audio_path
    except Exception as e:
        status.caption(f"An error occurred during audio extraction: {e}")
        return None

def transcribe_with_whisper(audio_path, status):
    try:
        # status.caption("Transcribing audio with Whisper...")
        result = whisper_model.transcribe(audio_path)
        # status.caption("Transcription complete.")
        return result["text"]
    except Exception as e:
        status.caption(f"An error occurred during transcription: {e}")
        return None

def transcribe_video(video_url, status, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio.mp3"):
    try:
        # Step 1: Download video from S3
        video_path = download_video_from_s3(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from video
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status)
        if not audio_path:
            return None

        # Step 3: Transcribe audio with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

def transcribe_video_clip(video_url, status, start_time, end_time, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio_clip.mp3"):
    try:
        # Step 1: Download video from S3
        video_path = download_video_from_s3(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from the specified clip
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status, start_time, end_time)
        if not audio_path:
            return None

        # Step 3: Transcribe the extracted audio clip with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")


In [None]:
# TODO: Replace tlk_XXXXXXXXXXXXXXXXXX with your Twelve Labs API Key
TWELVE_LABS_API_KEY ="tlk_XXXXXXXXXXXXXXXXXX"
# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

def truncate_text(text, max_tokens=77):
    # Truncate text to roughly 77 tokens (assuming ~6 chars per token on average)
    return text[:max_tokens * 6]  # Adjust based on actual tokenization behavior

def similarity_scores(search_text,results_limit=5):
    # Twelve Labs Embed API supports text-to-embedding  
    truncated_text = truncate_text(search_text, max_tokens=77)
    
    twelvelabs_response = twelvelabs_client.embed.create(
      model_name="Marengo-retrieval-2.7",
      text=truncated_text,
      text_truncate='start'
    )

    if twelvelabs_response.text_embedding is not None and twelvelabs_response.text_embedding.segments is not None:
        text_query_embeddings = twelvelabs_response.text_embedding.segments[0].embeddings_float
        return session.sql(f"""
            SELECT URL as VIDEO_URL,START_OFFSET_SEC,END_OFFSET_SEC,
            round(VECTOR_COSINE_SIMILARITY(embedding::VECTOR(FLOAT, 1024),{text_query_embeddings}::VECTOR(FLOAT, 1024)),2) as SIMILARITY_SCORE 
            from video_embeddings order by similarity_score desc limit {results_limit}""")
    else:
        return twelvelabs_response

In [None]:
st.subheader("Search Clips Application")

with st.container():
    with st.expander("Enter search text and select max results", expanded=True):
        left_col,mid_col,right_col = st.columns(3)
        with left_col:
            entered_text = st.text_input('Search Text')
        with mid_col:
            max_results = st.selectbox('Max Results',(1,2,3,4,5))
        with right_col:
            selected_llm = st.selectbox('Select Summary LLM',('llama3.2-3b','llama3.1-405b','mistral-large2', 'snowflake-arctic',))
        
with st.container():
    _,mid_col1,_ = st.columns([.3,.4,.2])
    with mid_col1:
        similarity_scores_btn = st.button('Search and Summarize Matching Video Clips',type="primary")

with st.container():
    if similarity_scores_btn:
        if entered_text:
            with st.status("In progress...") as status:
                df = similarity_scores(entered_text,max_results).to_pandas()
                status.subheader(f"Top {max_results} clip(s) for search query '{entered_text}'")
                for row in df.itertuples():
                    transcribed_clip = transcribe_video_clip(row.VIDEO_URL, status, row.START_OFFSET_SEC, row.END_OFFSET_SEC)
                    prompt = f"""
                    [INST] Summarize the following and include name of the video as well as start and end clip times in seconds with everything in natural language as opposed to attributes: 
                    ###
                    Video URL: {row.VIDEO_URL},
                    Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}
                    Clip Transcript: {transcribed_clip}
                    ###
                    [/INST]
                    """
                    status.write(f"Video URL: {row.VIDEO_URL}")
                    status.caption(f"-- Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}")
                    status.caption(f"-- Clip Transcript: {transcribed_clip}")
                    status.write(f"Summary: {cortex.Complete(selected_llm,prompt)}")
                    status.divider()
                status.update(label="Done!", state="complete", expanded=True)
        else:
            st.caption("User ERROR: Please enter search text!")