In [0]:
for view in spark.catalog.listTables():
    if view.name.startswith("_sqldf"):
        spark.catalog.dropTempView(view.name)

In [0]:
%sql
USE CATALOG sandwich;
CREATE SCHEMA IF NOT EXISTS monitoring;

CREATE TABLE IF NOT EXISTS monitoring.data_quality_log (
  table_name STRING,
  source_layer STRING,
  target_layer STRING,
  load_time TIMESTAMP,
  source_count BIGINT,
  target_count BIGINT,
  row_count_status STRING,
  data_match BOOLEAN
);


In [0]:
from datetime import datetime
import pyspark.sql.functions as F

bronze_schema = "sandwich.bronze"
silver_schema = "sandwich.silver"
monitoring_table = "sandwich.monitoring.data_quality_log"

spark.sql("USE CATALOG sandwich")

tables = [row.tableName for row in spark.sql(f"SHOW TABLES IN {bronze_schema}").collect()]

for tbl in tables:
    print(f"üîç Checking table: {tbl}")

    bronze_count = spark.sql(f"SELECT COUNT(*) AS cnt FROM {bronze_schema}.{tbl}").collect()[0]['cnt']
    silver_count = spark.sql(f"SELECT COUNT(*) AS cnt FROM {silver_schema}.{tbl}").collect()[0]['cnt']
    row_status = "OK" if bronze_count == silver_count else "MISMATCH"

    bronze_df = spark.table(f"{bronze_schema}.{tbl}")
    silver_df = spark.table(f"{silver_schema}.{tbl}")
    
    common_cols = [c for c in bronze_df.columns if c in silver_df.columns]
    bronze_df = bronze_df.select(common_cols)
    silver_df = silver_df.select(common_cols)

    diff = bronze_df.exceptAll(silver_df)
    identical = diff.count() == 0

    df = spark.createDataFrame([(
        tbl, "bronze", "silver", datetime.now(),
        bronze_count, silver_count, row_status, identical
    )], [
        "table_name", "source_layer", "target_layer",
        "load_time", "source_count", "target_count",
        "row_count_status", "data_match"
    ])

    df.writeTo(monitoring_table).append()

print("Monitoring data logged successfully")


Optional freshness check if latest order data isn‚Äôt stale

In [0]:
%sql
CREATE OR REPLACE TABLE monitoring.data_freshness AS
SELECT 
  'orders' AS table_name,
  MAX(o_orderdate) AS latest_date,
  CURRENT_TIMESTAMP() AS checked_at
FROM sandwich.silver.orders;

In [0]:
%sql
SELECT *
FROM sandwich.monitoring.data_quality_log
ORDER BY load_time DESC;