### Setup dependence, env variables and Spark context 

In [51]:
import os
from dotenv import load_dotenv
from os import listdir
from os.path import join
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask

TWELVE_LABS_API_KEY = os.getenv('TWELVE_LABS_API_KEY')
VIDEO_DIR = '/home/mikechu/code/videos'
PARQUET_DIR = '/home/mikechu/code/parquet'
spark = SparkSession.builder \
    .appName('semantic-video-search') \
    .getOrCreate()

### Spark UDF to convert video (urls or file paths) to embeddings 

In [52]:
def generate_embedding(path, file_or_url='file'):
    twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)
    params = {
        'engine_name': "Marengo-retrieval-2.6",
        'video_clip_length': 10,
    }
    params['video_file' if file_or_url=='file' else 'video_url'] = path
    task = twelvelabs_client.embed.task.create(**params)
    task.wait_for_done()
    task_result = twelvelabs_client.embed.task.retrieve(task.id)
    return [
        {
            'embedding': v.values,
            'start_offset_sec': v.start_offset_sec,
            'end_offset_sec': v.end_offset_sec,
            'embedding_scope': v.embedding_scope
        }
        for v in task_result.video_embeddings
    ]
    
generate_embedding_udf = udf(
    generate_embedding, 
    ArrayType(StructType([
        StructField("embedding", ArrayType(FloatType(), True)),
        StructField("start_offset_sec", FloatType(), True),
        StructField("end_offset_sec", FloatType(), True),
        StructField("embedding_scope", StringType(), True)                 
    ])))

### Update embeddings
1. read processed embeddings from parquet
2. read video file list from video directory
3. check if new videos(unprocessed videos) are detected
4. process new videos by creating embeddings for each new video
5. append new embeddings to parquet repo 

In [53]:
df1 = spark.read.parquet(f"{PARQUET_DIR}/embedding")
df2 = spark.createDataFrame([(join(VIDEO_DIR, f),) for f in listdir(VIDEO_DIR)], ['video_path']).join(df1, on='video_path', how='anti')
new_video_count = df2.count()
print(f"new video count: {new_video_count}")
if new_video_count > 0:
    df2 = df2.withColumn("embedding", generate_embedding_udf(df.video_path))
    df2.write.mode('append').parquet(f"{PARQUET_DIR}/embedding")
    df1 = df2
df1.printSchema()
df1.show(10, truncate=False)
print(f"total row count = {df1.count()}")

new video count: 0
root
 |-- video_path: string (nullable = true)
 |-- embedding: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- embedding: array (nullable = true)
 |    |    |    |-- element: float (containsNull = true)
 |    |    |-- start_offset_sec: float (nullable = true)
 |    |    |-- end_offset_sec: float (nullable = true)
 |    |    |-- embedding_scope: string (nullable = true)

+-----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------