# Lakebase — Online Serving Layer

Publishes Gold-layer data to [Lakebase](https://docs.databricks.com/en/database.html) (PostgreSQL-compatible)
for low-latency app and API access, plus Feature Store and Vector Search registration.

```
Gold Delta Tables  ──►  Lakebase Online Tables  ──►  Databricks App / AI Agent
                   ──►  Feature Serving Endpoint  ──►  ML Model Serving
                   ──►  Vector Search Index       ──►  RAG / Product Search
```

**Prereqs**: Run notebooks 00–03 first. Unity Catalog and a SQL Warehouse required.

## 1 — Configuration

In [None]:
CATALOG = spark.catalog.currentCatalog()
GOLD    = f"{CATALOG}.retail_gold"
SILVER  = f"{CATALOG}.retail_silver"
SERVING = f"{CATALOG}.retail_serving"  # schema for online/serving artifacts

print(f"Catalog  : {CATALOG}")
print(f"Gold     : {GOLD}")
print(f"Silver   : {SILVER}")
print(f"Serving  : {SERVING}")

In [None]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SERVING}")
spark.sql(f"COMMENT ON SCHEMA {SERVING} IS 'Serving layer — online tables, feature store, vector search for retail analytics'")
print(f"Schema ready: {SERVING}")

---
## 2 — Prepare Source Tables for Online Serving

Online tables require a **primary key**. We'll create serving-ready views/tables in the serving schema with explicit PKs.

### 2a — Customer RFM Lookup (key: customer_key)

In [None]:
from pyspark.sql import functions as F

# Customer RFM — used by apps and agents for real-time customer lookup
df_rfm = spark.table(f"{GOLD}.gold_customer_rfm").select(
    "customer_key",
    "market_segment",
    "customer_nation",
    "customer_region",
    "recency_days",
    "frequency",
    F.col("monetary").alias("lifetime_value"),
    "avg_order_value",
    "rfm_score",
    "rfm_segment",
    "customer_lifetime_days",
)

tbl_rfm = f"{SERVING}.customer_rfm_online"
df_rfm.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(tbl_rfm)

# Set primary key (required for feature store / online serving)
spark.sql(f"ALTER TABLE {tbl_rfm} ALTER COLUMN customer_key SET NOT NULL")
try:
    spark.sql(f"ALTER TABLE {tbl_rfm} ADD CONSTRAINT pk_customer PRIMARY KEY (customer_key)")
except Exception as e:
    if "already exists" in str(e).lower():
        pass  # constraint already in place
    else:
        raise

print(f"✓ {tbl_rfm} — {spark.table(tbl_rfm).count():,} rows, PK: customer_key")

### 2b — Product Performance Lookup (key: brand + part_type)

In [None]:
# Product performance — keyed by brand + part_type for app/agent lookup
df_prod = (
    spark.table(f"{GOLD}.gold_product_performance")
    .withColumn("product_id", F.concat_ws("|", "brand", "part_type", "manufacturer", "price_band"))
    .select(
        "product_id",
        "brand",
        "part_type",
        "manufacturer",
        "price_band",
        "net_revenue",
        "total_profit",
        "profit_margin_pct",
        "total_quantity_sold",
        "return_rate_pct",
        "avg_discount",
    )
)

tbl_prod = f"{SERVING}.product_perf_online"
df_prod.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(tbl_prod)

spark.sql(f"ALTER TABLE {tbl_prod} ALTER COLUMN product_id SET NOT NULL")
try:
    spark.sql(f"ALTER TABLE {tbl_prod} ADD CONSTRAINT pk_product PRIMARY KEY (product_id)")
except Exception as e:
    if "already exists" in str(e).lower():
        pass
    else:
        raise

print(f"✓ {tbl_prod} — {spark.table(tbl_prod).count():,} rows, PK: product_id")

### 2c — Supplier Scorecard Lookup (key: supplier_key)

In [None]:
df_supp = spark.table(f"{GOLD}.gold_supplier_scorecard").select(
    "supplier_key",
    "supplier_name",
    "supplier_nation",
    "supplier_region",
    "net_revenue",
    "total_profit",
    "profit_margin_pct",
    "on_time_delivery_pct",
    "avg_delivery_delay_days",
    "return_rate_pct",
    "total_line_items",
)

tbl_supp = f"{SERVING}.supplier_score_online"
df_supp.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(tbl_supp)

spark.sql(f"ALTER TABLE {tbl_supp} ALTER COLUMN supplier_key SET NOT NULL")
try:
    spark.sql(f"ALTER TABLE {tbl_supp} ADD CONSTRAINT pk_supplier PRIMARY KEY (supplier_key)")
except Exception as e:
    if "already exists" in str(e).lower():
        pass
    else:
        raise

print(f"✓ {tbl_supp} — {spark.table(tbl_supp).count():,} rows, PK: supplier_key")

---
## 3 — Publish to Lakebase (PostgreSQL-Compatible Database)

Lakebase provides a **PostgreSQL-wire-compatible** database for low-latency, high-concurrency serving.

We connect via `psycopg2`, create tables, and push Gold data directly into Lakebase.
Any application, BI tool, or AI agent can then query it with a standard PostgreSQL driver.

> **Connection**: Uses the Lakebase sandbox endpoint. Update `LAKEBASE_HOST` if your endpoint differs.

In [None]:
%pip install psycopg2-binary --quiet

In [None]:
import psycopg2
import pandas as pd
from pyspark.sql import functions as F
from databricks.sdk import WorkspaceClient

# ── Lakebase Connection Config ──────────────────────────────────────────────
# Lakebase uses OAuth access tokens for auth (NOT PATs).
# We extract the OAuth token from the SDK, which handles the auth flow.

# Update this to match your Lakebase sandbox endpoint (Catalog > Database Instances).
LAKEBASE_HOST = "your-endpoint.database.us-east-1.cloud.databricks.com"
LAKEBASE_DB   = "databricks_postgres"
LAKEBASE_USER = spark.sql("SELECT current_user()").collect()[0][0]
LAKEBASE_PORT = 5432

# ── Get OAuth Token ──────────────────────────────────────────────────────────
# Try multiple methods to get a working token for Lakebase.

token = None
method = None

# Method 1: SDK authenticate() — gets the actual bearer token (OAuth or PAT)
try:
    w = WorkspaceClient()
    headers = w.config.authenticate()
    auth_header = headers.get("Authorization", "")
    if auth_header.startswith("Bearer "):
        token = auth_header[7:]  # strip 'Bearer ' prefix
        method = "SDK OAuth bearer token"
except Exception:
    pass

# Method 2: Notebook context API token
if not token:
    try:
        token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
        method = "Notebook context token"
    except Exception:
        pass

# Method 3: SDK config.token directly
if not token:
    try:
        sdk_token = WorkspaceClient().config.token
        if sdk_token:
            token = sdk_token
            method = "SDK config token"
    except Exception:
        pass

# Method 4: Widget fallback
if not token:
    dbutils.widgets.text("lakebase_token", "", "Paste your Databricks OAuth/PAT token")
    token = dbutils.widgets.get("lakebase_token")
    if token:
        method = "Notebook widget"

if token:
    print(f"✓ Token loaded via: {method}")
    print(f"  Token prefix: {token[:8]}...  (length: {len(token)})")
else:
    print("✗ No token found.")

def get_pg_conn():
    """Return a psycopg2 connection to Lakebase."""
    return psycopg2.connect(
        host=LAKEBASE_HOST,
        port=LAKEBASE_PORT,
        dbname=LAKEBASE_DB,
        user=LAKEBASE_USER,
        password=token,
        sslmode="require",
    )

# Test the connection
if token:
    try:
        with get_pg_conn() as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT version()")
                ver = cur.fetchone()[0]
        print(f"\n✓ Connected to Lakebase!")
        print(f"  Host     : {LAKEBASE_HOST}")
        print(f"  Database : {LAKEBASE_DB}")
        print(f"  User     : {LAKEBASE_USER}")
        print(f"  Version  : {ver[:80]}")
    except Exception as e:
        print(f"\n✗ Connection failed with {method}: {e}")
        print(f"\n  Debugging info:")
        print(f"  Token prefix : {token[:8]}...")
        print(f"  Token length : {len(token)}")
        print(f"  User         : {LAKEBASE_USER}")
        print(f"\n  Next steps:")
        print(f"  1. Try connecting via psql from your local machine to confirm creds work")
        print(f"  2. Check if your user has been granted access to the Lakebase database")
        print(f"  3. In Lakebase UI, check Database Settings → Allowed Users")

In [None]:
# ── Push Gold tables into Lakebase ──────────────────────────────────────────
# Read from serving Delta tables (with PKs) and write into Lakebase PostgreSQL.
# Uses psycopg2.extras.execute_values for fast bulk inserts (20-50x faster).

from psycopg2.extras import execute_values
import time

def push_to_lakebase(spark_table_fqn, pg_table_name, pk_col, create_sql):
    """
    Read a Spark table, push it into a Lakebase PostgreSQL table.
    Uses execute_values for fast batch inserts (~1000 rows per round-trip).
    """
    t0 = time.time()
    pdf = spark.table(spark_table_fqn).toPandas()
    t_read = time.time() - t0

    with get_pg_conn() as conn:
        with conn.cursor() as cur:
            # Drop and recreate (idempotent full refresh)
            cur.execute(f"DROP TABLE IF EXISTS {pg_table_name} CASCADE")
            cur.execute(create_sql)

            # Fast bulk insert using execute_values (batches of 1000 rows)
            cols = list(pdf.columns)
            col_str = ", ".join(cols)
            insert_sql = f"INSERT INTO {pg_table_name} ({col_str}) VALUES %s"
            rows = [tuple(row) for row in pdf.itertuples(index=False, name=None)]

            t1 = time.time()
            execute_values(cur, insert_sql, rows, page_size=1000)
            t_insert = time.time() - t1

        conn.commit()

    print(f"  ✓ {pg_table_name:<30} {len(pdf):>10,} rows  (read: {t_read:.1f}s, insert: {t_insert:.1f}s)")
    return len(pdf)

print("Push function ready.")

In [None]:
# Re-establish variables in case earlier cells were skipped after pip restart
CATALOG = spark.catalog.currentCatalog()
GOLD    = f"{CATALOG}.retail_gold"
SILVER  = f"{CATALOG}.retail_silver"
SERVING = f"{CATALOG}.retail_serving"

# ── Table 1: Customer RFM ───────────────────────────────────────────────────
push_to_lakebase(
    spark_table_fqn=f"{SERVING}.customer_rfm_online",
    pg_table_name="customer_rfm",
    pk_col="customer_key",
    create_sql="""
        CREATE TABLE customer_rfm (
            customer_key       INTEGER PRIMARY KEY,
            market_segment     VARCHAR(20),
            customer_nation    VARCHAR(30),
            customer_region    VARCHAR(20),
            recency_days       INTEGER,
            frequency          BIGINT,
            lifetime_value     DOUBLE PRECISION,
            avg_order_value    DOUBLE PRECISION,
            rfm_score          INTEGER,
            rfm_segment        VARCHAR(30),
            customer_lifetime_days INTEGER
        )
    """,
)

# ── Table 2: Product Performance ────────────────────────────────────────────
push_to_lakebase(
    spark_table_fqn=f"{SERVING}.product_perf_online",
    pg_table_name="product_performance",
    pk_col="product_id",
    create_sql="""
        CREATE TABLE product_performance (
            product_id           TEXT PRIMARY KEY,
            brand                VARCHAR(20),
            part_type            VARCHAR(80),
            manufacturer         VARCHAR(30),
            price_band           VARCHAR(20),
            net_revenue          DOUBLE PRECISION,
            total_profit         DOUBLE PRECISION,
            profit_margin_pct    DOUBLE PRECISION,
            total_quantity_sold  DOUBLE PRECISION,
            return_rate_pct      DOUBLE PRECISION,
            avg_discount         DOUBLE PRECISION
        )
    """,
)

# ── Table 3: Supplier Scorecard ─────────────────────────────────────────────
push_to_lakebase(
    spark_table_fqn=f"{SERVING}.supplier_score_online",
    pg_table_name="supplier_scorecard",
    pk_col="supplier_key",
    create_sql="""
        CREATE TABLE supplier_scorecard (
            supplier_key            INTEGER PRIMARY KEY,
            supplier_name           VARCHAR(30),
            supplier_nation         VARCHAR(30),
            supplier_region         VARCHAR(20),
            net_revenue             DOUBLE PRECISION,
            total_profit            DOUBLE PRECISION,
            profit_margin_pct       DOUBLE PRECISION,
            on_time_delivery_pct    DOUBLE PRECISION,
            avg_delivery_delay_days DOUBLE PRECISION,
            return_rate_pct         DOUBLE PRECISION,
            total_line_items        BIGINT
        )
    """,
)

In [None]:
# ── Verify: Query Lakebase directly ─────────────────────────────────────────
print("Lakebase table verification:\n")

verify_queries = [
    ("customer_rfm",        "SELECT COUNT(*) AS cnt FROM customer_rfm"),
    ("product_performance", "SELECT COUNT(*) AS cnt FROM product_performance"),
    ("supplier_scorecard",  "SELECT COUNT(*) AS cnt FROM supplier_scorecard"),
]

with get_pg_conn() as conn:
    with conn.cursor() as cur:
        print(f"  {'Table':<25} {'Rows':>10}")
        print(f"  {'='*40}")
        for tbl, sql in verify_queries:
            cur.execute(sql)
            cnt = cur.fetchone()[0]
            print(f"  {tbl:<25} {cnt:>10,}")

        # Sample point lookup — the real value of Lakebase
        print(f"\n  Sample point lookup (customer_key = 42):")
        cur.execute("SELECT customer_key, rfm_segment, lifetime_value, rfm_score FROM customer_rfm WHERE customer_key = 42")
        row = cur.fetchone()
        if row:
            print(f"    Key: {row[0]}, Segment: {row[1]}, LTV: ${row[2]:,.2f}, RFM: {row[3]}")
        else:
            # Try first available customer
            cur.execute("SELECT customer_key, rfm_segment, lifetime_value, rfm_score FROM customer_rfm LIMIT 1")
            row = cur.fetchone()
            print(f"    Key: {row[0]}, Segment: {row[1]}, LTV: ${row[2]:,.2f}, RFM: {row[3]}")

print(f"\n✓ Lakebase serving layer ready!")
print(f"  Connect from any app: psql 'postgresql://{LAKEBASE_USER}@{LAKEBASE_HOST}/{LAKEBASE_DB}?sslmode=require'")

---
## 4 — Feature Engineering & Feature Store

Register customer features in the **Databricks Feature Store** (Unity Catalog) for use in ML models and real-time serving.

In [None]:
%pip install databricks-feature-engineering --quiet
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

fe = FeatureEngineeringClient()

# The customer_rfm_online table is already a great feature table.
# Register it as a Feature Table so ML models can look up features at inference time.

rfm_table = f"{SERVING}.customer_rfm_online"

try:
    fe.create_table(
        name=rfm_table,
        primary_keys=["customer_key"],
        description="Customer RFM features: recency, frequency, monetary value, segment scores",
        df=spark.table(rfm_table),
    )
    print(f"✓ Feature table created: {rfm_table}")
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"○ Feature table already exists: {rfm_table}")
        # Update the feature table data
        fe.write_table(
            name=rfm_table,
            df=spark.table(rfm_table),
            mode="overwrite",
        )
        print(f"  ✓ Feature data refreshed")
    else:
        print(f"⚠ Feature store: {e}")

## 5 — Vector Search Index (for Product Semantic Search)

Create a vector search index on product names so the AI Agent can do **semantic product lookup** (e.g., "find products similar to polished brass").

In [None]:
from pyspark.sql import functions as F

CATALOG = spark.catalog.currentCatalog()
SILVER  = f"{CATALOG}.retail_silver"
SERVING = f"{CATALOG}.retail_serving"

# Prepare a product catalog table with a text column for embedding
df_product_catalog = (
    spark.table(f"{SILVER}.dim_part")
    .select(
        F.col("part_key").alias("product_id"),
        "part_name",
        "brand",
        "manufacturer",
        "part_type",
        "container",
        "price_band",
        "retail_price",
        # Concatenated text field for embedding
        F.concat_ws(" | ",
            F.col("part_name"),
            F.col("brand"),
            F.col("manufacturer"),
            F.col("part_type"),
            F.col("container"),
            F.col("price_band"),
        ).alias("search_text"),
    )
)

tbl_catalog = f"{SERVING}.product_catalog"
df_product_catalog.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(tbl_catalog)

spark.sql(f"ALTER TABLE {tbl_catalog} ALTER COLUMN product_id SET NOT NULL")
try:
    spark.sql(f"ALTER TABLE {tbl_catalog} ADD CONSTRAINT pk_prod_catalog PRIMARY KEY (product_id)")
except:
    pass  # Already exists

# Enable Change Data Feed (required for vector search sync)
spark.sql(f"ALTER TABLE {tbl_catalog} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

print(f"✓ {tbl_catalog} — {spark.table(tbl_catalog).count():,} products ready for vector search")

In [None]:
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

VS_ENDPOINT = "retail_vs_endpoint"
VS_INDEX    = f"{SERVING}.product_catalog_index"

# Step 1: Create vector search endpoint (if not exists)
try:
    w.vector_search_endpoints.create_endpoint(name=VS_ENDPOINT, endpoint_type="STANDARD")
    print(f"✓ Vector search endpoint created: {VS_ENDPOINT}")
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"○ Endpoint already exists: {VS_ENDPOINT}")
    else:
        print(f"⚠ Endpoint creation: {str(e)[:200]}")

In [None]:
# Step 2: Create vector search index with auto-embedding
# Uses Databricks' built-in embedding model (databricks-gte-large-en)
try:
    w.vector_search_indexes.create_index(
        name=VS_INDEX,
        endpoint_name=VS_ENDPOINT,
        primary_key="product_id",
        index_type="DELTA_SYNC",
        delta_sync_index_spec={
            "source_table": tbl_catalog,
            "pipeline_type": "TRIGGERED",
            "embedding_source_columns": [
                {"name": "search_text", "embedding_model_endpoint_name": "databricks-gte-large-en"}
            ],
        },
    )
    print(f"✓ Vector index created: {VS_INDEX}")
    print(f"  Embedding column: search_text")
    print(f"  Model: databricks-gte-large-en")
    print(f"  Sync mode: TRIGGERED (manual refresh)")
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"○ Index already exists: {VS_INDEX}")
    else:
        print(f"⚠ Index creation: {str(e)[:200]}")
        print(f"  If vector search is not available, the AI Agent will use SQL-based search instead.")

## 6 — Verify Serving Layer

In [None]:
serving_tables = [
    "customer_rfm_online",
    "product_perf_online",
    "supplier_score_online",
    "product_catalog",
]

print(f"{'Serving Table':<30} {'Rows':>12}  {'Columns':>8}")
print("=" * 55)
for t in serving_tables:
    df = spark.table(f"{SERVING}.{t}")
    print(f"{t:<30} {df.count():>12,}  {len(df.columns):>8}")

In [None]:
# Quick test: look up a specific customer
sample_cust = spark.table(f"{SERVING}.customer_rfm_online").limit(1).collect()[0]
print(f"Sample customer lookup:")
print(f"  Key:       {sample_cust['customer_key']}")
print(f"  Segment:   {sample_cust['rfm_segment']}")
print(f"  LTV:       ${sample_cust['lifetime_value']:,.2f}")
print(f"  RFM Score: {sample_cust['rfm_score']}")
print(f"  Region:    {sample_cust['customer_region']}")

---
Lakebase tables, Feature Store entries, and Vector Search index are all live.

Continue with `06_ai_ml_models.ipynb`.