In [0]:
dbutils.widgets.text(name="env", defaultValue='', label='Enter the environment in lower case')
env = dbutils.widgets.get("env")

In [0]:
%run "./commons"

In [0]:
from pyspark.sql.functions import current_timestamp, col

## Read 5 dimension tables (Product, Customer, Warehouse, Region, Employee)

In [0]:
def read_bronze_table(environment, table_name):
    print(f"Reading Bronze table {table_name}: ", end='')
    df = spark.readStream.table(f"`{environment}_catalog`.`bronze`.{table_name}")
    print("Success!")
    return df

In [0]:
df_bronze_product = read_bronze_table(env, "raw_product")
df_bronze_customer = read_bronze_table(env, "raw_customer")
df_bronze_warehouse = read_bronze_table(env, "raw_warehouse")
df_bronze_employee = read_bronze_table(env, "raw_employee")
df_bronze_region = read_bronze_table(env, "raw_region")

## Clean & Transform 5 dimension tables (Product, Customer, Warehouse, Region, Employee)

In [0]:
# Transform Warehouse + Region into a silver_warehouse dimension-like table
def join_Warehouse_Region(df_wh, df_reg):
    print("Joining Warehouse and Region: ", end='')
    df_joined = (df_wh.alias("w")
                    .join(df_reg.alias("r"), col("w.RegionID") == col("r.RegionID"), "left")
                    .select(
                        col("w.WarehouseID"),
                        col("w.WarehouseName"),
                        col("w.WarehouseAddress"),
                        col("w.RegionID"),
                        col("r.RegionName"),
                        col("r.CountryName"),
                        col("r.State"),
                        col("r.City"),
                        col("r.PostalCode")
                    ))
    print("Success!")
    return df_joined

In [0]:
def add_Transformed_Time(df):
    print("Adding Transformed_Time column: ", end='')
    df_out = df.withColumn("Transformed_Time", current_timestamp())
    print("Success!")
    return df_out

In [0]:
# generic writer to silver tables
def write_to_silver(streaming_df, environment, table_name, chk_subdir):
    print(f"Writing Silver table {table_name}: ", end='')
    q = (streaming_df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint}/{chk_subdir}/Checkpt")
        .outputMode("append")
        .queryName(f"Silver_{table_name}_WriteStream")
        .trigger(availableNow=True)
        .toTable(f"`{environment}_catalog`.`silver`.`{table_name}`"))
    q.awaitTermination()
    print("Success!")

In [0]:
# # Stop all active streams (safest reset)
# for s in spark.streams.active:
#     print("Stopping stream:", s.name, s.id)
#     s.stop()

# checkpoint_root = "gs://bkt-dev-120125/checkpoints"

# paths = [
#     "SilverProductLoad/Checkpt",
#     "SilverCustomerLoad/Checkpt",
#     "SilverRegionLoad/Checkpt",
#     "SilverWarehouseLoad/Checkpt",
#     "SilverEmployeeLoad/Checkpt"
# ]

# for p in paths:
#     full = f"{checkpoint_root}/{p}"
#     print("Removing:", full)
#     dbutils.fs.rm(full, True)


# for tbl in ["silver_product", "silver_customer", "silver_region", "silver_warehouse", "silver_employee"]:
#     fq_name = f"`{env}_catalog`.`silver`.`{tbl}`"
#     print("Dropping table if exists:", fq_name)
#     spark.sql(f"DROP TABLE IF EXISTS {fq_name}")






In [0]:
# 1. silver_product
df_prod_no_dups = remove_Dups(df_bronze_product)
df_prod_clean = handle_NULLs(df_prod_no_dups, df_prod_no_dups.schema.names)
df_prod_final = add_Transformed_Time(df_prod_clean)

write_to_silver(df_prod_final, env, "silver_product", "SilverProductLoad")

In [0]:
# 2. silver_customer
df_cust_no_dups = remove_Dups(df_bronze_customer)
df_cust_clean = handle_NULLs(df_cust_no_dups, df_cust_no_dups.schema.names)
df_cust_final = add_Transformed_Time(df_cust_clean)

write_to_silver(df_cust_final, env, "silver_customer", "SilverCustomerLoad")

In [0]:
# 3. silver_region (if you still want a separate region dimension)
df_reg_no_dups = remove_Dups(df_bronze_region)
df_reg_clean = handle_NULLs(df_reg_no_dups, df_reg_no_dups.schema.names)
df_reg_final = add_Transformed_Time(df_reg_clean)

write_to_silver(df_reg_final, env, "silver_region", "SilverRegionLoad")

In [0]:
# 4. silver_warehouse_flat (warehouse + region, flattened)
# Spark Structured Streaming does not allow a left outer join between two streams 
# unless we add watermarks and a time-range condition.

# Streaming side: warehouse from bronze
df_wh_no_dups = remove_Dups(df_bronze_warehouse)
df_wh_clean = handle_NULLs(df_wh_no_dups, df_wh_no_dups.schema.names)

# Static side: read region from the silver table as a batch DataFrame
df_region_dim = spark.table(f"`{env}_catalog`.`silver`.`silver_region`")

# use function, now with streaming (warehouse) + static (region)
df_wh_joined = join_Warehouse_Region(df_wh_clean, df_region_dim)
df_wh_final = add_Transformed_Time(df_wh_joined)

write_to_silver(df_wh_final, env, "silver_warehouse", "SilverWarehouseLoad")

In [0]:
# 5. silver_employee
df_emp_no_dups    = remove_Dups(df_bronze_employee)
df_emp_clean      = handle_NULLs(df_emp_no_dups, df_emp_no_dups.schema.names)
df_emp_final      = add_Transformed_Time(df_emp_clean)

write_to_silver(df_emp_final, env, "silver_employee", "SilverEmployeeLoad")

## Display sample data in silver tables

In [0]:
display(spark.sql(f"SELECT * FROM `{env}_catalog`.`silver`.`silver_product` LIMIT 10"))
display(spark.sql(f"SELECT * FROM `{env}_catalog`.`silver`.`silver_customer` LIMIT 10"))
display(spark.sql(f"SELECT * FROM `{env}_catalog`.`silver`.`silver_warehouse` LIMIT 10"))

In [0]:
display(spark.sql(f"SELECT COUNT(*) FROM `{env}_catalog`.`silver`.`silver_product`"))
display(spark.sql(f"SELECT COUNT(*) FROM `{env}_catalog`.`silver`.`silver_customer`"))
display(spark.sql(f"SELECT COUNT(*) FROM `{env}_catalog`.`silver`.`silver_warehouse`"))