In [None]:
%pip install 'databricks-sdk==0.61.0' 'databricks-sdk[notebook]'

In [None]:
dbutils.widgets.text("source_table_catalog", "tanner_wendland_postgres_adtech")
dbutils.widgets.text("source_table_database", "public")
dbutils.widgets.text("sink_catalog", "tanner_wendland")
dbutils.widgets.text("sink_schema", "default")
dbutils.widgets.text("vector_search_endpoint", "tanner_vs_endpoint")

In [None]:
source_table_catalog = dbutils.widgets.get("source_table_catalog")
source_table_database = dbutils.widgets.get("source_table_database")
sink_catalog = dbutils.widgets.get("sink_catalog")
sink_schema = dbutils.widgets.get("sink_schema")
vector_search_endpoint = dbutils.widgets.get("vector_search_endpoint")
print(source_table_catalog)
print(source_table_database)
print(sink_catalog)
print(sink_schema)
print(vector_search_endpoint)


In [16]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

# Construct full table names using widget values
source_table_name = f"{source_table_catalog}.{source_table_database}.chat_history"
sink_table_name = f"{sink_catalog}.{sink_schema}.chat_history"

print(f"Source table: {source_table_name}")
print(f"Destination table: {sink_table_name}")

# Read source data
print("Reading source data...")
source_df = spark.table(source_table_name)

# Check if destination table exists
if spark.catalog.tableExists(sink_table_name):
    print(f"Destination table {sink_table_name} exists. Performing incremental load...")
    
    # Read existing destination table
    existing_df = spark.table(sink_table_name)
    
    # Get the maximum timestamp/id from existing data for incremental load
    # Assuming there's a created_at or updated_at timestamp column
    # If not, we can use id or another sequential column
    max_timestamp = existing_df.agg({"created_at": "max"}).collect()[0][0]
    
    if max_timestamp:
        print(f"Last processed timestamp: {max_timestamp}")
        # Filter source data for incremental load
        incremental_df = source_df.filter(source_df.created_at > max_timestamp)
    else:
        # If max_timestamp is None, load all data
        incremental_df = source_df
        
    print(f"Found {incremental_df.count()} new records to process")
    
    if incremental_df.count() > 0:
        # Append new data to existing table
        incremental_df.write.mode("append").saveAsTable(sink_table_name)
        print(f"Successfully appended {incremental_df.count()} records to {sink_table_name}")
    else:
        print("No new records to process")
        
else:
    # Destination table doesn't exist, create it with all source data
    print(f"Destination table {sink_table_name} doesn't exist. Creating with full load...")

    spark.sql(f"""
        CREATE TABLE {sink_table_name} (
            id INTEGER NOT NULL,
            chat_id STRING NOT NULL,
            user_name STRING NOT NULL,
            message_type STRING NOT NULL,
            message_content STRING NOT NULL,
            created_at TIMESTAMP NOT NULL,
            message_order INTEGER NOT NULL,
            PRIMARY KEY (id)
        )
        USING DELTA 
        TBLPROPERTIES (delta.enableChangeDataFeed = true)
    """)

    record_count = source_df.count()
    source_df.write.mode("overwrite").saveAsTable(sink_table_name)
    
    print(f"Successfully created {sink_table_name} with {record_count} records and primary key constraint")

Source table: tanner_wendland_postgres_adtech.public.chat_history
Destination table: tanner_wendland.default.chat_history
Reading source data...
Destination table tanner_wendland.default.chat_history exists. Performing incremental load...


HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))

Last processed timestamp: 2025-07-30 17:13:00.622155


HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))

Found 2 new records to process


HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))

HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))

HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))

Successfully appended 2 records to tanner_wendland.default.chat_history


In [14]:
# Display final table info
final_df = spark.table(sink_table_name)
print(f"\nFinal table {sink_table_name} contains {final_df.count()} total records")
print("\nSample of latest records:")
display(final_df.orderBy("created_at", ascending=False).limit(5))

HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))


Final table tanner_wendland.default.chat_history contains 4 total records

Sample of latest records:


HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))

Unnamed: 0,id,chat_id,user_name,message_type,message_content,created_at,message_order
0,4,c7cd4361-5145-4ddf-92f3-4704931a1864,tanner.wendland@databricks.com,ASSISTANT,"Yes, I'm working now! How can I help you today?",2025-07-30 22:13:00.622155,4
1,3,c7cd4361-5145-4ddf-92f3-4704931a1864,tanner.wendland@databricks.com,USER,Are you working now?,2025-07-30 22:12:58.658630,3
2,2,c7cd4361-5145-4ddf-92f3-4704931a1864,tanner.wendland@databricks.com,ASSISTANT,Error calling model serving endpoint: INVALID_PARAMETER_VALUE: Missing required Chat parameter: 'messages',2025-07-30 22:06:36.518033,2
3,1,c7cd4361-5145-4ddf-92f3-4704931a1864,tanner.wendland@databricks.com,USER,test chat,2025-07-30 22:06:36.082661,1


## Vector Search

In [17]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.vectorsearch import VectorIndexType, DeltaSyncVectorIndexSpecRequest, PipelineType, EmbeddingSourceColumn, EndpointType

# Initialize the Workspace client
w = WorkspaceClient()
index_name = f"{sink_table_name}_index"

vs_exists = any(vs.name == vector_search_endpoint for vs in w.vector_search_endpoints.list_endpoints())

if not vs_exists:
    w.vector_search_endpoints.create_endpoint(
        name=vector_search_endpoint,
        endpoint_type=EndpointType.STANDARD,
    )
else:
    print(f"Vector search endpoint {vector_search_endpoint} already exists")

index_exists = any(index.name == index_name for index in w.vector_search_indexes.list_indexes(vector_search_endpoint))

if not index_exists:
    index_spec = DeltaSyncVectorIndexSpecRequest(
        source_table=sink_table_name,
        pipeline_type=PipelineType.CONTINUOUS,
        embedding_source_columns=[EmbeddingSourceColumn(name="message_content", embedding_model_endpoint_name="databricks-gte-large-en")]
    )

    index = w.vector_search_indexes.create_index(
        endpoint_name=vector_search_endpoint,
        name=index_name,
        index_type=VectorIndexType.DELTA_SYNC,
        delta_sync_index_spec=index_spec,
        primary_key="id"
    )
else:
    print(f"Vector search index {index_name} already exists")


Vector search endpoint one-env-shared-endpoint-7 already exists
Vector search index tanner_wendland.default.chat_history_index already exists
