# Movie Script Processing

In [0]:
%pip install --quiet mlflow[databricks] langchain-text-splitters lxml transformers langchain databricks-vectorsearch bs4 markdownify torch

dbutils.library.restartPython()

In [0]:
CATALOG = "media_advertising" 
SCHEMA = "contextual_advertising" 
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/scripts"

In [0]:
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

In [0]:
%run ./resources/00-init

# Exploration

In [0]:
scripts_df = spark.read.format("delta").load(f"{VOLUME_PATH}")

In [0]:
display(scripts_df.limit(10))

## Scripts Analysis

In [0]:
row_count = scripts_df.count()
row_count

In [0]:
import random
from pprint import pprint
num = random.randint(0, 1222)

script = scripts_df.filter(scripts_df.unique_movie_id == num).select("script").collect()[0][0]
print(script)

# Preprocessing Pipeline
This pipeline includes custom logic to parse the movie scripts, implement basic data cleansing, and chunking logic. The goal is to prepare our raw movie scripts in a format such that we can build a vector search endpoint on top of the Delta table. For more best practices on parsing, enrichment, and chunking strategies, visit [Build an unstructured Data Pipeline](https://docs.databricks.com/aws/en/generative-ai/tutorials/ai-cookbook/quality-data-pipeline-rag)

## Parsing & Splitting

In [0]:
import re
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, mean, stddev, min, max, median

scene_header_pattern = r"\b(INT|EXT|INTERIOR|EXTERIOR)(\.|\s*\/\s*\.?\s*(EXT|INT|INTERIOR|EXTERIOR)\.?|\.)?(\s|$)"
scene_header_regex = re.compile(scene_header_pattern, re.IGNORECASE | re.MULTILINE)

@udf("struct<scene_count: int, scenes: array<struct<header: string, scene_number: int, text: string>>>")
def extract_scenes(script_text):
    scenes = []
    matches = list(scene_header_regex.finditer(script_text))
    
    # Extract text between headers
    for i, match in enumerate(matches):
        print(match)
        header = match.group().strip()
        start = match.end()  # End of the header line
        end = matches[i+1].start() if i < len(matches)-1 else len(script_text)
        scene_text = script_text[start:end].strip() #.replace("\r\n", "").replace("\t", "")
        scene_text_cleaned = re.sub(r"\s+", " ", scene_text).strip()
        # scene_text = script_text[start:end].replace("\r\n", "").replace("\t", "").strip()
        scenes.append((header, scene_text_cleaned))
    
    return {"scene_count": len(scenes), "scenes": [{"header": header, "scene_number": i+1, "text": scene_text} for i, (header, scene_text) in enumerate(scenes)]}


In [0]:
scripts_with_scenes_df = scripts_df.withColumn("scene_count", extract_scenes(scripts_df.script)["scene_count"]) \
                                   .withColumn("scenes", extract_scenes(scripts_df.script)["scenes"])

print(scripts_with_scenes_df.count())
# filter out movies with less than 10 scenes
scripts_filtered = scripts_with_scenes_df.filter(col("scene_count") > 35)
print(scripts_filtered.count())

In [0]:
display(scripts_filtered)

In [0]:
exploded_scenes_df = scripts_filtered.withColumn("scene", explode(col("scenes"))) \
                                     .select(col("unique_movie_id"), 
                                             col("scene.scene_number").alias("scene_number"), 
                                             col("scene.text").alias("scene_text")) \
                                     .withColumn("unique_movie_scene_id", concat(col("unique_movie_id"), lit("_"), col("scene_number")))

display(exploded_scenes_df)

## Chunking

In [0]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer, OpenAIGPTTokenizer

In [0]:
min_chunk_size=50
max_chunk_size=500

tokenizer = OpenAIGPTTokenizer.from_pretrained("openai-gpt")
text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(tokenizer, chunk_size=max_chunk_size, chunk_overlap=50)

@udf("array<string>")
def chunk_scenes(scene, min_chunk_size=50, max_chunk_size=500):
    scene_chunks = text_splitter.split_text(scene)
    chunks = []
    previous_chunk = ""
    # filter out very short scenes that would not provide much value if retrieved
    for c in scene_chunks:
      if len(tokenizer.encode(previous_chunk + c)) <= max_chunk_size/2:
          previous_chunk += c + "\n"
      else:
          chunks.extend(text_splitter.split_text(previous_chunk.strip()))
          previous_chunk = c + "\n"
    if previous_chunk:
        chunks.extend(text_splitter.split_text(previous_chunk.strip()))
    return [c for c in chunks if len(tokenizer.encode(c)) > min_chunk_size]

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

# Assuming text_splitter and tokenizer are defined and accessible in your environment.
# Make sure text_splitter and tokenizer are imported or defined globally
# or passed into your function scope if they need to be initialized once per worker.

@pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
def chunk_scenes_pandas_udf(scenes: pd.Series) -> pd.Series:
    """
    Chunks text scenes into smaller pieces using a Pandas UDF for vectorized processing.
    Uses fixed min/max chunk sizes defined within the UDF.

    Args:
        scenes (pd.Series): A Pandas Series where each element is a scene's text (string).

    Returns:
        pd.Series: A Pandas Series where each element is a list of strings (the chunks for that scene).
    """
    # Fixed chunk sizes directly within the UDF's scope
    min_chunk_size = 50
    max_chunk_size = 500

    results = []
    for scene in scenes:
        if scene is None:
            results.append([])
            continue

        scene_chunks = text_splitter.split_text(scene)
        chunks = []
        previous_chunk = ""

        for c in scene_chunks:
            current_chunk_stripped = c.strip()
            if not current_chunk_stripped and not previous_chunk:
                continue

            # Ensure proper concatenation before encoding for combined length check
            combined_text = (previous_chunk + "\n" + current_chunk_stripped).strip() if previous_chunk else current_chunk_stripped

            encoded_len = len(tokenizer.encode(combined_text))

            if encoded_len <= max_chunk_size / 2: # Keep original logic for max_chunk_size
                previous_chunk = combined_text # Update previous_chunk with the combined text
            else:
                # Process the accumulated previous_chunk
                split_prev = text_splitter.split_text(previous_chunk.strip())
                if split_prev:
                    chunks.extend(split_prev)
                # Start new previous_chunk with the current chunk
                previous_chunk = current_chunk_stripped # Start new previous_chunk

        if previous_chunk:
            split_prev = text_splitter.split_text(previous_chunk.strip())
            if split_prev:
                chunks.extend(split_prev)

        # Filter chunks by min_chunk_size
        filtered_chunks = [c for c in chunks if len(tokenizer.encode(c)) > min_chunk_size]
        results.append(filtered_chunks)

    return pd.Series(results)

@udf("int")
def count_tokens(text):
    return len(tokenizer.encode(text))


In [0]:
exploded_scenes_df_with_tokens = exploded_scenes_df.withColumn("token_count", count_tokens(col("scene_text")))
exploded_scenes_df_with_tokens = exploded_scenes_df_with_tokens.filter(exploded_scenes_df_with_tokens.token_count > min_chunk_size)

In [0]:
movie_metadata = spark.table("movie_metadata").select("unique_movie_id", "title")
display(movie_metadata)

In [0]:
joined_df = exploded_scenes_df_with_tokens.join(movie_metadata, "unique_movie_id", "left")
joined_df = joined_df.select("unique_movie_scene_id", "unique_movie_id", "title", "scene_number", "scene_text")

In [0]:
display(joined_df.limit(5))

In [0]:
display(joined_df.count())

In [0]:
content_table_name = f'{CATALOG}.{SCHEMA}.movie_scripts_content'
dbutils.widgets.text("CONTENT_TABLE_NAME", content_table_name)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS IDENTIFIER('${CONTENT_TABLE_NAME}') (
  unique_movie_scene_id STRING,
  unique_movie_id LONG,
  title STRING,
  scene_number INT,
  scene_text STRING
)
USING DELTA
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
);

In [0]:
movie_scripts_content = joined_df.withColumn("scene_text", explode(chunk_scenes_pandas_udf(joined_df.scene_text))) \
  .select(col("unique_movie_scene_id"),
          col("unique_movie_id"),
          col("title"),
          col("scene_number"),
          col("scene_text"))
movie_scripts_content.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{CATALOG}.{SCHEMA}.movie_scripts_content")

In [0]:
movie_scripts_content = spark.table("movie_scripts_content").select("unique_movie_scene_id", "unique_movie_id", "title", "scene_number")

In [0]:
%sql
ALTER TABLE movie_scripts_content
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:
%sql
ALTER TABLE movie_scripts_content
ALTER COLUMN unique_movie_scene_id SET NOT NULL;

ALTER TABLE movie_scripts_content
ADD CONSTRAINT pk_unique_movie_scene_id PRIMARY KEY (unique_movie_scene_id);

## Indexing
In this section, we will use our cleaned Delta table to create a Vector Search Index, which will serve our script embeddings and enable the retrival component of our RAG Agent

In [0]:
# vector search config
from databricks.vector_search.client import VectorSearchClient
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.catalog as c

VECTOR_SEARCH_ENDPOINT_NAME = "one-env-shared-endpoint-10"

In [0]:
# create vector search endpoint if it does not already exist
vsc = VectorSearchClient()

if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")

wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready.")

In [0]:
#The table we'd like to index
source_table_fullname = f"{CATALOG}.{SCHEMA}.movie_scripts_content"
# Where we want to store our index
vs_index_fullname = f"{CATALOG}.{SCHEMA}.movie_scripts_content_vs"

if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname):
  print(f"Creating index {vs_index_fullname} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
  vsc.create_delta_sync_index(
    endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
    index_name=vs_index_fullname,
    source_table_name=source_table_fullname,
    pipeline_type="TRIGGERED",
    primary_key="unique_movie_scene_id",
    embedding_source_column='scene_text', #The column containing our text
    embedding_model_endpoint_name='databricks-gte-large-en' #The embedding endpoint used to create the embeddings
  )
  #Let's wait for the index to be ready and all our embeddings to be created and indexed
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
else:
  #Trigger a sync to update our vs content with the new data saved in the table
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
  vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).sync()

print(f"index {vs_index_fullname} on table {source_table_fullname} is ready")

In [0]:
import mlflow.deployments
deploy_client = mlflow.deployments.get_deploy_client("databricks")

question = "Look for a scene that targets 18-34 males for a footwear brands"

results = vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).similarity_search(
  query_text=question,
  columns=["title", "scene_number"],
  num_results=1)
docs = results.get('result', {}).get('data_array', [])
docs