We could use dspy to generate embeddings, however here we take advantage of databricks' native vector search feature which allows us to set up a sync to a delta table and manages the updates to the embeddings for us. 

In [0]:
%pip install databricks-vectorsearch mlflow

dbutils.library.restartPython()

In [0]:
from mlflow.models import ModelConfig
from databricks.vector_search.client import VectorSearchClient
from pyspark.sql.utils import AnalysisException

In [0]:
config_file = "../config.yaml"
model_config = ModelConfig(development_config=config_file)

In [0]:
CATALOG = model_config.get("catalog")
SCHEMA = model_config.get("schema")
TABLE = model_config.get("table")
source_path = f"{CATALOG}.{SCHEMA}.{TABLE}"

VECTOR_SEARCH_ENDPOINT = model_config.get("vector_search_endpoint")
VECTOR_SEARCH_INDEX = model_config.get("vector_search_index")
index_path = f"{CATALOG}.{SCHEMA}.{VECTOR_SEARCH_INDEX}"

EMBEDDING_ENDPOINT_NAME = model_config.get("embedding_endpoint_name")


In [0]:
client = VectorSearchClient()

In [0]:
try:
    index = client.create_delta_sync_index(
      endpoint_name=VECTOR_SEARCH_ENDPOINT,
      source_table_name=source_path,
      index_name=index_path,
      pipeline_type="TRIGGERED",
      primary_key="unique_chunk_index",
      embedding_source_column="page_content",
      embedding_model_endpoint_name=EMBEDDING_ENDPOINT_NAME,
    )
except AnalysisException as e:
    print(f"Error creating index: {e}")

In [0]:
import time
index = client.get_index(endpoint_name=VECTOR_SEARCH_ENDPOINT,index_name=index_path)
while not index.describe().get('status')['ready']:
  print("Waiting for index to be ready...")
  time.sleep(30)
print("Index is ready!")
index.describe()

In [0]:
results = index.similarity_search(
  query_text="databricks mlflow",
  columns=["unique_chunk_index", "page_content"],
  num_results=5
  )

In [0]:
results