Initialize Data

In [0]:
%run "../00-init/load-data"

Imports and Setup

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import DatabaseInstance, SyncedDatabaseTable, SyncedTableSpec, SyncedTableSchedulingPolicy, DatabaseTable, DatabaseCatalog
from databricks.sdk.service.catalog import *
import psycopg2
import uuid
import time

w = WorkspaceClient()
me = w.current_user.me().emails[0].value

Delta Table Setup

In [0]:
def prepare_source_table_schema(table_name):
    print(f"Ensuring schema columns exist for {table_name}...")
    try:
        current_schema = spark.table(table_name).columns
        columns_to_add = []

        if "en_route" not in current_schema:
            spark.sql(f"ALTER TABLE {table_name} ADD COLUMN en_route BOOLEAN")
            spark.sql(f"UPDATE {table_name} SET en_route = False")

        if "delivered" not in current_schema:
            spark.sql(f"ALTER TABLE {table_name} ADD COLUMN delivered BOOLEAN")
            spark.sql(f"UPDATE {table_name} SET delivered = False")

        if "cookies" not in current_schema:
            spark.sql(f"ALTER TABLE {table_name} ADD COLUMN cookies INT")
            spark.sql(f"UPDATE {table_name} SET cookies = Null")

        print("✓ Schema meets requirements.")
        else:
            print("✓ Schema already meets requirements.")
    except Exception as e:
        print(f"Error modifying source table. Error: {e}")
        raise e

source_table_name = "main.dbrx_12daysofdemos.gift_requests"
prepare_source_table_schema(source_table_name)

In [0]:
db_config = DatabaseInstance(
    name="demodb",
    capacity="CU_1"
)

catalog_config = DatabaseCatalog(
    name="demo_pg_catalog",
    database_instance_name=db_config.name,
    database_name= "databricks_postgres"
)

table_config = SyncedDatabaseTable(
    name=catalog_config.name+".public.gift_requests_pg",
    spec=SyncedTableSpec(
        create_database_objects_if_missing = True,
        primary_key_columns = ["request_id"],
        source_table_full_name = source_table_name, 
        timeseries_key = "timestamp",
        scheduling_policy=SyncedTableSchedulingPolicy.SNAPSHOT
    )
)

db_instances_list = w.database.list_database_instances()

if any(instance.name == db_config.name for instance in db_instances_list):
    print("✓ Demo database already exists!")
else:
    print("Demo database not found. Creating database instance...")
    w.database.create_database_instance(db_config)
    db_state = 'NOT READY'
    while db_state != "AVAILABLE":
        time.sleep(5)
        db_state = w.database.get_database_instance(db_config.name).state.value
    print("✓ Database created!")

try: 
    w.database.get_database_catalog(name=catalog_config.name)
    print("✓ Catalog already exists!")
except:
    print("Catalog not found. Creating catalog...")
    w.database.create_database_catalog(catalog_config)
    print("✓ Catalog created!")

spark.sql(f"GRANT ALL PRIVILEGES ON CATALOG {catalog_config.name} TO `{me}`;")

try: 
    w.database.get_synced_database_table(name=table_config.name)
    print("✓ Synced Table already exists!")
except:
    print("Synced Table not found. Creating table...")
    w.database.create_synced_database_table(table_config)
    print("✓ Synced Table created!")

In [0]:

instance_name = db_config.name
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = me,
    password = cred.token,
    sslmode = "require"
)

synced_table = "public.gift_requests_pg"

pg_sql_query = f"""
    SELECT * FROM {synced_table} WHERE delivered IS False LIMIT 5;
"""

try:
    with conn.cursor() as cur:
        cur.execute(pg_sql_query)
except Exception as e:
    print(f"Error: {e}")

conn.close()

In [0]:

instance_name = 'tempdb'
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = me,
    password = cred.token,
    sslmode = "require"
)

synced_table = "public.gift_requests_pg"

pg_sql_query = f"""
    SELECT * FROM {synced_table} LIMIT 5;
"""

try:
    with conn.cursor() as cur:
        cur.execute(pg_sql_query)
        rows = cur.fetchall()
        for row in rows:
            print(row)
except Exception as e:
    print(f"Error: {e}")