In [0]:
# 1. Install SDKs (run once per cluster)
%pip install databricks-sdk[openai]
%pip install databricks-sdk[openai] databricks-vectorsearch

# 2. Imports and client initialization
from pyspark.sql.functions import col
from databricks.sdk import WorkspaceClient
from databricks.vector_search.client import VectorSearchClient

# 3. Clients
wclient = WorkspaceClient()
openai_client = wclient.serving_endpoints.get_open_ai_client()
vs_client = VectorSearchClient()

# 4. Parameters
DELTA_TABLE       = "prod.storyspark.book_inventory"
VECTOR_ENDPOINT   = "storyspark_book_inventory_endpoint"
VECTOR_NAMESPACE  = "prod.storyspark.book_inventory_index"
MODEL_NAME        = "databricks-gte-small-en"
WATERMARK_TABLE   = "prod.storyspark.book_inventory_watermark"
BATCH_SIZE        = 100

In [0]:
%sql
--Create Watermark Control Table (If Not Exists)
--This table records the last updated_at timestamp you processed.

CREATE TABLE IF NOT EXISTS prod.storyspark.book_inventory_watermark (
  last_timestamp TIMESTAMP
)
USING DELTA;

-- Seed a baseline row if the table is empty
INSERT INTO prod.storyspark.book_inventory_watermark
SELECT to_timestamp('1970-01-01 00:00:00')
WHERE NOT EXISTS (SELECT 1 FROM prod.storyspark.book_inventory_watermark);

In [0]:
# Load Last Processed Timestamp

watermark_df = spark.table(WATERMARK_TABLE).limit(1)
last_timestamp = watermark_df.collect()[0]["last_timestamp"]
print(f"Last sync time: {last_timestamp}")


In [0]:
# Read New or Updated Rows

new_rows_df = (
    spark.table(DELTA_TABLE)
         .filter(col("last_updated_at") > last_timestamp)
         #.select("book_id", "owner_id", "relevant_text", "last_read", "last_updated_at")
)

count = new_rows_df.count()
print(f"Found {count} new/updated rows")


In [0]:
# Define Embedding Helper

def embed_texts(texts: list[str]) -> list[list[float]]:
    # Batch call to reduce request overhead
    resp = openai_client.embeddings.create(
        model=MODEL_NAME,
        input=texts
    )
    return [item.embedding for item in resp.data]


In [0]:
# Build & Upsert Vector Batches
rows = new_rows_df.collect()
vectors_payload = []

for row in rows:
    embedding = embed_texts([rows.relevant_text])[0]
    vectors_payload.append({
        "id":       row.book_id,
        "values":   embedding,
        "metadata": {
            "owner_id": row.owner_id,
            "last_read": str(row.last_read)
        }
    })

# Upsert in batches
for start in range(0, len(vectors_payload), BATCH_SIZE):
    batch = vectors_payload[start : start + BATCH_SIZE]
    vs_client.upsert_vectors(
        endpoint_name=VECTOR_ENDPOINT,
        namespace=VECTOR_NAMESPACE,
        vectors=batch
    )

In [0]:
# Update Watermark

from pyspark.sql.types import StructType, StructField, TimestampType

if rows:
    # Compute the maximum updated_at we just processed
    new_max_timestamp = max(r["last_updated_at"] for r in rows)
else:
    # No new rows → keep old watermark
    new_max_timestamp = last_timestamp

last_timestamp_schema = StructType([
    StructField("last_timestamp", TimestampType(), nullable=False)
])

# Overwrite the watermark table
spark.createDataFrame(
    [(new_max_timestamp,)],
    last_timestamp_schema
).write.mode("overwrite").saveAsTable(WATERMARK_TABLE)

print(f"Watermark updated to: {new_max_timestamp}")