In [0]:
%sql CREATE SCHEMA IF NOT EXISTS retail_migration.monitoring;

In [0]:
from pyspark.sql import SparkSession, functions as F
import uuid
import time

spark = SparkSession.builder.getOrCreate()
# Define source and target catalogs
source_catalog = "samples.tpch"

# Get all tables except internal ones
tables = [t.name for t in spark.catalog.listTables(source_catalog) if not t.name.startswith("_")]
print("Tables to validate:", tables)

# Create monitoring table if it doesn't exist
spark.sql("""
CREATE TABLE IF NOT EXISTS retail_migration.monitoring.validation_results (
    table_name STRING,
    bronze_count BIGINT,
    silver_count BIGINT,
    match BOOLEAN,
    validation_time TIMESTAMP
)
""")

# Validate row counts between bronze and silver layers
from pyspark.sql import Row
from datetime import datetime

results = []

for t in tables:
    bronze_count = spark.table(f"retail_migration.bronze.{t}").count()
    silver_count = spark.table(f"retail_migration.silver.{t}").count()
    match = bronze_count == silver_count
    print(f"{t}: bronze={bronze_count}, silver={silver_count}, match={match}")
    results.append(Row(table_name=t, bronze_count=bronze_count, silver_count=silver_count,
                       match=match, validation_time=datetime.now()))

# Save results into the monitoring table
spark.createDataFrame(results).write.mode("append").saveAsTable("retail_migration.monitoring.validation_results")

# Optional: view results
display(spark.table("retail_migration.monitoring.validation_results").orderBy("validation_time", ascending=False))
