In [0]:
bronze_df = (
    spark.read
    .format("delta")
    .table("bronze.customers")
)

In [0]:
display(bronze_df.limit(50))

In [0]:
from pyspark.sql.functions import col, sum

null_counts = bronze_df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in bronze_df.columns
])

null_counts.show()

In [0]:
silver_df = (
    spark.read.table("bronze.customers")
    .filter("customer_unique_id IS NOT NULL")
    .dropDuplicates(["customer_unique_id"])
    .dropDuplicates(["customer_id"])
    .drop("_rescued_data")
)
silver_df.display()


In [0]:
silver_df.printSchema()

In [0]:
geo_df = (
    spark.read.table("silver.geolocations_cleaned")
)
    

In [0]:
customer_geo_df = silver_df.join(geo_df, silver_df.customer_zip_code_prefix == geo_df.geolocation_zip_code_prefix, "left").select(*silver_df.columns, geo_df.geolocation_sk)

customer_geo_df.display()

In [0]:
customer_geo_df.createOrReplaceTempView("silver_updates")

In [0]:
if not spark.catalog.tableExists("silver.customers_cleaned"):
    (customer_geo_df.write
        .format("delta") 
        .mode("overwrite") 
        .saveAsTable("silver.customers_cleaned"))
else:
    customer_geo_df.createOrReplaceTempView("silver_updates")
    spark.sql("""
    MERGE INTO silver.customers_cleaned AS target
    USING silver_updates AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """)

In [0]:
%sql select * from silver.customers_cleaned limit 10;