
## Fuzzy Matching Documents

This demo walks through identifying documents that have some matches in an existing data store

We do this through a pipeline involving Mosaic AI Query and Vector Search. 

- With AI Query, we run an LLM with batch inference to quickly generate summaries.

- With Mosaic Vector Search, we can retrieve similar summaries

- Finally, we can verify the potential fuzzy matching documents with AI query

In [0]:
%pip install --upgrade --force-reinstall databricks-vectorsearch 
dbutils.library.restartPython()


### Data Setup

In [0]:
# dbutils.widgets.text("endpoint_name", "databricks-meta-llama-3-1-8b-instruct", "Endpoint Name")
dbutils.widgets.dropdown("lm_endpoint_name", "databricks-meta-llama-3-1-8b-instruct", ["databricks-meta-llama-3-1-8b-instruct","databricks-meta-llama-3-3-70b-instruct", "databricks-claude-3-7-sonnet"], "LM Endpoint Name")
dbutils.widgets.text("embedding_endpoint_name", "databricks-gte-large-en", "Embedding Endpoint Name")
dbutils.widgets.text("catalog_name", "batch", "Data UC Catalog")
dbutils.widgets.text("schema_name", "dpechi", "Data UC Schema")
dbutils.widgets.text("table_name", "star_trek_episode_text", "Data UC Table Base")

lm_endpoint_name = dbutils.widgets.get("lm_endpoint_name")
embedding_endpoint_name = dbutils.widgets.get("embedding_endpoint_name")
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
table_name = dbutils.widgets.get("table_name")

In [0]:
import pandas as pd
df = pd.read_csv('TNG.csv', encoding='latin1')

In [0]:
df.head()

In [0]:
df_agg = df.groupby(['episode', 'act', 'who']).agg({'text': ' '.join}).reset_index()
df_agg

In [0]:
df_agg2 = df.groupby(['episode', 'act']).agg({'text': ' '.join}).reset_index()
df_agg2

In [0]:
df_agg.columns = [col.lower() for col in df_agg.columns]
df_agg2.columns = [col.lower() for col in df_agg2.columns]

spark_df = spark.createDataFrame(df_agg.copy())
spark_df.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable(".".join([catalog_name, schema_name, table_name]))

spark_df = spark.createDataFrame(df_agg2.copy())
spark_df.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable(".".join([catalog_name, schema_name, f"{table_name}_act"]))

In [0]:
spark.sql(f"""
  select 
  (select count(*) from {".".join([catalog_name, schema_name, table_name])}) as count_text2,
  (select count(*) from {".".join([catalog_name, schema_name, table_name+"_act"])}) as count_text_act
  """).display()

In [0]:
spark.sql(f"""
          select * from {".".join([catalog_name, schema_name, table_name])}
          """).display()


### Summarizing Documents with DBSQL + AI Query

In [0]:
import time


start_time = time.time()

command = f"""
    SELECT text,  
    ai_query(
        '{lm_endpoint_name}', -- endpoint name
        CONCAT('Provide a 2-3 sentence synopsis of the characters lines from the episode:', text)
    ) AS act_synopsis
    FROM {".".join([catalog_name, schema_name, table_name+"_act"])}
"""

result = spark.sql(command)

display(result)

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")


### Building our Vector Store with Embedded Document Summaries

In [0]:
delta_table_name = f"{catalog_name}.{schema_name}.{table_name}_delta"
vector_search_endpoint_name = "synopsis_endpoint"
vs_index_fullname = f"{catalog_name}.{schema_name}.{table_name}_index"

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

result_with_id = result.withColumn("id", monotonically_increasing_id())
result_with_id.write.mode('overwrite').format("delta").option("delta.enableChangeDataFeed", "true").saveAsTable(delta_table_name)

In [0]:
from databricks.vector_search.client import VectorSearchClient

#building vector store
vsc = VectorSearchClient()

vsc.create_endpoint(
    name=vector_search_endpoint_name,
    endpoint_type="STANDARD"
)

endpoint = vsc.get_endpoint(
  name=vector_search_endpoint_name)

index = vsc.create_delta_sync_index(
  endpoint_name=vector_search_endpoint_name,
  source_table_name=delta_table_name,
  index_name=vs_index_fullname,
  pipeline_type='TRIGGERED',
  primary_key="id",
  embedding_source_column="act_synopsis",
  embedding_model_endpoint_name=embedding_endpoint_name
)
index.describe()


### Embedding our new documents, and retrieving similar ones from the vector store

In [0]:
spark.sql(
  f"""
  create table if not exists {delta_table_name+"_emb"} as
select ai_query({"'"+embedding_endpoint_name+"'"}, act_synopsis) as search_input, act_synopsis from {delta_table_name}
""")

In [0]:
spark.sql(
  f"""
  select * from {delta_table_name+"_emb"} limit 5
  """)

In [0]:
final_matches_init = spark.sql(
  f"""
SELECT {delta_table_name + "_emb"}.search_input AS search_input, 
       {delta_table_name + "_emb"}.act_synopsis AS search_input_text, 
       search.act_synopsis AS search_result 
FROM {delta_table_name + "_emb"},
LATERAL (
  SELECT * FROM vector_search(
    index => '{vs_index_fullname}', 
    query_vector => {delta_table_name + "_emb"}.search_input, 
    num_results => 1
  )
) AS search
"""
)

final_matches_init.display()


### Validating that these are fuzzy matches with AI Query

In [0]:
matching_prompt = "Validate that these 2 summaries are matches. Provide only a yes or no."

final_matches = final_matches_init.limit(100).selectExpr("search_input_text", "search_result", f"ai_query('{lm_endpoint_name}', CONCAT('{matching_prompt}', 'Summary 1:', search_input_text, 'Summary 2:', search_result)) as output")


final_matches.display()