SILVER TEST

In [0]:
%run /Workspace/Users/theoknowles96@gmail.com/dbx_free/Utility/utils

In [0]:
# Databricks notebook source
from pyspark.sql import functions as F

# ╔═══════════════════════════════════════════════════════════════════╗
# ║ 1. Widgets: to make notebook reusable                             ║
# ╚═══════════════════════════════════════════════════════════════════╝

dbutils.widgets.text("bronze_catalog", "bronze")
dbutils.widgets.text("bronze_schema", "tpcds_sf1")
dbutils.widgets.text("silver_catalog", "silver")
dbutils.widgets.text("silver_schema", "tpcds_sf1")

bronze_catalog = dbutils.widgets.get("bronze_catalog")
bronze_schema = dbutils.widgets.get("bronze_schema")
silver_catalog = dbutils.widgets.get("silver_catalog")
silver_schema = dbutils.widgets.get("silver_schema")

print(f"Reading from {bronze_catalog}.{bronze_schema} and writing to {silver_catalog}.{silver_schema}")


In [0]:
# ╔═══════════════════════════════════════════════════════════════════╗
# ║ 2. Get all tables in the Bronze schema                            ║
# ╚═══════════════════════════════════════════════════════════════════╝

tables_df = spark.sql(f"SHOW TABLES IN {bronze_catalog}.{bronze_schema}")
bronze_tables = [row["tableName"] for row in tables_df.collect()]

print("Tables found in Bronze:")
print(bronze_tables)


In [0]:
call_center_df = spark.read.table(f"{bronze_catalog}.{bronze_schema}.call_center")
display(call_center_df)

In [0]:
new_df = add_surrogate_key(call_center_df, "sk")
display(new_df)

In [0]:
merge_key_map = {
    "call_center": ["cc_call_center_sk"],
    "catalog_returns": ["cr_item_sk", "cr_order_number"],
    "catalog_sales": ["cs_item_sk", "cs_order_number"],
    "customer": ["c_customer_sk"],
    "customer_address": ["ca_address_sk"],
    "customer_demographics": ["cd_demo_sk"],
    "date_dim": ["d_date_sk"],
    "household_demographics": ["hd_demo_sk"],
    "income_band": ["ib_income_band_sk"],
    "inventory": ["inv_item_sk", "inv_warehouse_sk"],
    "item": ["i_item_sk"],
    "promotion": ["p_promo_sk"],
    "reason": ["r_reason_sk"],
    "ship_mode": ["sm_ship_mode_sk"],
    "store": ["s_store_sk"],
    "store_returns": ["sr_item_sk", "sr_ticket_number"],
    "store_sales": ["ss_item_sk", "ss_ticket_number"],
    "time_dim": ["t_time_sk"],
    "warehouse": ["w_warehouse_sk"],
    "web_page": ["wp_web_page_sk"],
    "web_returns": ["wr_item_sk", "wr_order_number"],
    "web_sales": ["ws_item_sk", "ws_order_number"],
    "web_site": ["web_site_sk"],
    "catalog_page": ["cp_catalog_page_sk"]
}


In [0]:
tables = [row.tableName for row in spark.sql(f"SHOW TABLES IN {bronze_catalog}.{bronze_schema}").collect()]
print(f"Schema {bronze_schema} → {len(tables)} tables")

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_catalog}.{silver_schema}")

for table in tables:
    source_table = f"{bronze_catalog}.{bronze_schema}.{table}"
    target_table = f"{silver_catalog}.{silver_schema}.{table}"
    
    # Pick merge keys (table-specific or fallback)
    keys = merge_key_map.get(table)
    
    # Call util function
    upsert_table(source_table, target_table, keys)
