In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime, timedelta

# Initialize Spark
spark = SparkSession.builder.appName("DataQualityAudit").getOrCreate()

# --- 1. SETUP: Create Sample Data with Anomalies ---
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("cust_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("email", StringType(), True),
    StructField("updated_at", TimestampType(), True)
])

# Data containing all 7 issues:
# Duplicates, NULLs, Outliers, Bad Patterns, Stale data, and Missing Parents
data = [
    ("ORD01", "C1", 100.0, "a@b.com", datetime.now()),
    ("ORD01", "C1", 100.0, "a@b.com", datetime.now()),            # 1. Duplicate
    ("ORD02", None, 50.0, "c@d.com", datetime.now()),             # 2. NULL (Critical)
    ("ORD03", "C2", -50.0, "e@f.com", datetime.now()),            # 3. Numeric Outlier (Negative)
    ("ORD04", "C3", 75.0, "invalid_email", datetime.now()),       # 4. Pattern Mismatch
    ("ORD05", "C1", 20.0, "g@h.com", datetime.now() - timedelta(days=5)), # 5. Stale (Freshness)
    ("ORD06", "C99", 30.0, "i@j.com", datetime.now()),            # 6. Integrity (C99 doesn't exist)
]
df = spark.createDataFrame(data, schema)
df.display()

# Parent Table for Referential Integrity
df_parents = spark.createDataFrame([("C1",), ("C2",), ("C3",)], ["cust_id"])
df_parents.display()

# --- 2. THE 7 QUALITY CHECKS & FIXES ---

print("Starting Quality Audit...")

# 1. Freshness Check (Threshold: 24h)
limit = datetime.now() - timedelta(hours=24)
stale = df.filter(F.col("updated_at") < limit)
print(f"Check 1 (Freshness): {stale.count()} stale rows found.")
df_clean = df.filter(F.col("updated_at") >= limit) # FIX: Remove stale
df_clean.display()

# 2. Volume Test (Expectation: Min 5 rows)
if df_clean.count() < 5:
    print(f"Check 2 (Volume): ALERT! Low data volume detected.")

# 3. NULL Values Check
nulls = df_clean.filter(F.col("cust_id").isNull())
print(f"Check 3 (NULLs): {nulls.count()} rows with missing Customer IDs.")
df_clean = df_clean.dropna(subset=["cust_id"]) # FIX: Drop critical nulls
df_clean.display()

# 4. Numeric Distribution (Outlier Detection)
outliers = df_clean.filter((F.col("amount") <= 0) | (F.col("amount") > 1000))
print(f"Check 4 (Numeric): {outliers.count()} outliers found.")
df_clean = df_clean.filter(F.col("amount") > 0) # FIX: Filter logical errors
df_clean.display()

# 5. Uniqueness (Duplicate Check)
dupes = df_clean.groupBy("order_id").count().filter("count > 1")
print(f"Check 5 (Uniqueness): {dupes.count()} duplicate IDs found.")
from pyspark.sql import Window
df_clean = df_clean.withColumn("rn", F.row_number().over(
    Window.partitionBy("order_id").orderBy(F.col("updated_at").desc()))
).filter(F.col("rn") == 1).drop("rn") # FIX: De-duplicate, keep latest by updated_at
df_clean.display()

# 6. Referential Integrity (Join Check)
# Find orders with no matching parent in df_parents
orphans = df_clean.join(df_parents, "cust_id", "left_anti")
print(f"Check 6 (Integrity): {orphans.count()} orphaned records found.")
df_clean = df_clean.join(df_parents, "cust_id", "inner") # FIX: Keep only valid relationships
df_clean.display()

# 7. String Pattern (Regex Validation)
email_p = r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$"
bad_strings = df_clean.filter(~F.col("email").rlike(email_p))
print(f"Check 7 (Patterns): {bad_strings.count()} bad email formats.")
df_clean = df_clean.filter(F.col("email").rlike(email_p)) # FIX: Enforce format
df_clean.display()

# --- FINAL OUTPUT ---
print(f"\nAudit Complete. Original: {df.count()} rows | Cleaned: {df_clean.count()} rows")
df_clean.display()