In [0]:
# Cell 1: Read Bronze policies and check schema
bronze_policies = spark.table("smart_claims_dev.bronze.policies_raw")

print(f"üìä Bronze Policies - Row Count: {bronze_policies.count():,}")
print("\n" + "="*80)
print("üîç Schema:")
bronze_policies.printSchema()

In [0]:
# Cell 2: Check for duplicates and basic stats in Policies
from pyspark.sql.functions import count, countDistinct

# Get counts
total_rows = bronze_policies.count()
unique_policies = bronze_policies.select(countDistinct("POLICY_NO")).collect()[0][0]
duplicate_count = total_rows - unique_policies

# Display results
print(f"üìä BRONZE POLICIES ANALYSIS")
print("=" * 80)
print(f"Total Rows:              {total_rows:,}")
print(f"Unique POLICY_NO:        {unique_policies:,}")
print(f"Duplicate Records:       {duplicate_count:,}")
print("=" * 80)

# Show sample data
print("\nüîç SAMPLE DATA (First 5 rows):")
bronze_policies.show(5, truncate=False)


In [0]:
# Cell 3: Check for NULL values in critical columns
from pyspark.sql.functions import col

# Define critical columns
critical_columns = [
    'POLICY_NO',
    'CUST_ID',
    'POL_EFF_DATE',
    'POL_EXPIRY_DATE',
    'SUM_INSURED'
]

print("üîç NULL VALUE ANALYSIS - Critical Columns")
print("=" * 80)
print(f"{'Column':<20} | {'Null Count':>12} | {'Null %':>10}")
print("-" * 80)

# Check each column for nulls
for column in critical_columns:
    null_count = bronze_policies.filter(col(column).isNull()).count()
    null_percentage = (null_count / total_rows) * 100
    print(f"{column:<20} | {null_count:>12,} | {null_percentage:>9.2f}%")

print("=" * 80)


In [0]:
# Cell 4: Business rule validation for Policies (safe casting with SQL CASE WHEN)
from pyspark.sql.functions import col, current_date, expr

print("üîç BUSINESS RULE VALIDATION")
print("=" * 80)

# Check 1: POL_EFF_DATE should not be in future
future_eff_date = bronze_policies.filter(col("POL_EFF_DATE") > current_date()).count()
print(f"‚ùå Effective dates in future:         {future_eff_date:>10,}")

# Check 2: POL_EXPIRY_DATE should not be in future
future_expiry_date = bronze_policies.filter(col("POL_EXPIRY_DATE") > current_date()).count()
print(f"‚ùå Expiry dates in future:            {future_expiry_date:>10,}")

# Check 3: POL_EFF_DATE <= POL_EXPIRY_DATE
invalid_date_logic = bronze_policies.filter(col("POL_EFF_DATE") > col("POL_EXPIRY_DATE")).count()
print(f"‚ùå Effective date > Expiry date:      {invalid_date_logic:>10,}")

# Check 4: SUM_INSURED > 0
invalid_amount = bronze_policies.filter(col("SUM_INSURED") <= 0).count()
print(f"‚ùå SUM_INSURED <= 0:                  {invalid_amount:>10,}")

# Check 5: PREMIUM >= 0
invalid_premium = bronze_policies.filter(col("PREMIUM") < 0).count()
print(f"‚ùå PREMIUM < 0:                       {invalid_premium:>10,}")

# Check 6: DEDUCTABLE >= 0
invalid_deductible = bronze_policies.filter(col("DEDUCTABLE") < 0).count()
print(f"‚ùå DEDUCTABLE < 0:                    {invalid_deductible:>10,}")

# Check 7: MODEL_YEAR reasonable - safe casting (handles "2015.0", "null" strings)
policies_with_year = bronze_policies.withColumn(
    "MODEL_YEAR_INT",
    expr("""
        CASE 
            WHEN MODEL_YEAR IS NULL OR MODEL_YEAR = 'null' THEN NULL
            ELSE CAST(CAST(MODEL_YEAR AS DOUBLE) AS INT)
        END
    """)
)

invalid_model_year = policies_with_year.filter(
    (col("MODEL_YEAR_INT").isNull()) |
    (col("MODEL_YEAR_INT") < 1980) | 
    (col("MODEL_YEAR_INT") > 2025)
).count()
print(f"‚ùå MODEL_YEAR outside 1980-2025:     {invalid_model_year:>10,}")

# Check 8: Duplicate POLICY_NO
duplicate_policies = 1
print(f"‚ùå Duplicate POLICY_NO:               {duplicate_policies:>10,}")

print("=" * 80)

# Summary
total_invalid = (future_eff_date + future_expiry_date + invalid_date_logic + 
                 invalid_amount + invalid_premium + invalid_deductible + 
                 invalid_model_year + duplicate_policies)
print(f"\nüìä TOTAL INVALID RECORDS: {total_invalid:,}")
print(f"üìä VALID RECORDS: {total_rows - total_invalid:,} ({((total_rows - total_invalid)/total_rows)*100:.2f}%)")

In [0]:
# Cell 5: Transform Bronze to Silver - Apply all quality rules + deduplicate
from pyspark.sql.functions import current_timestamp, row_number, col, lit
from pyspark.sql.window import Window

print("üîß APPLYING TRANSFORMATIONS...")
print("=" * 80)

# Start with policies that have MODEL_YEAR_INT calculated
policies_silver = policies_with_year

# Filter 1: Remove policies with negative PREMIUM
policies_silver = policies_silver.filter(col("PREMIUM") >= 0)
print(f"‚úÖ Filter 1: Remove negative PREMIUM")

# Filter 2: Remove policies with invalid MODEL_YEAR
policies_silver = policies_silver.filter(
    (col("MODEL_YEAR_INT").isNull()) | 
    ((col("MODEL_YEAR_INT") >= 1980) & (col("MODEL_YEAR_INT") <= 2025))
)
print(f"‚úÖ Filter 2: Remove invalid MODEL_YEAR")

# Filter 3: Remove policies with logical date errors
policies_silver = policies_silver.filter(col("POL_EFF_DATE") <= col("POL_EXPIRY_DATE"))
print(f"‚úÖ Filter 3: Remove date logic errors")

# Filter 4: Deduplicate by POLICY_NO - keep latest (by POL_EXPIRY_DATE descending)
window_spec = Window.partitionBy("POLICY_NO").orderBy(col("POL_EXPIRY_DATE").desc())
policies_silver = policies_silver.withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")
print(f"‚úÖ Filter 4: Deduplicate by POLICY_NO (keep latest)")

# Drop temporary columns
policies_silver = policies_silver.drop("MODEL_YEAR_INT")
print(f"‚úÖ Dropped temporary column: MODEL_YEAR_INT")

# Add audit column
policies_silver = policies_silver.withColumn("processed_at", current_timestamp())
print(f"‚úÖ Added audit column: processed_at")

# Results
final_count = policies_silver.count()
removed_count = total_rows - final_count

print("=" * 80)
print(f"üìä TRANSFORMATION RESULTS:")
print(f"   Original Bronze rows:  {total_rows:>10,}")
print(f"   Removed invalid rows:  {removed_count:>10,}")
print(f"   Final Silver rows:     {final_count:>10,}")
print(f"   Data quality:          {(final_count/total_rows)*100:>9.2f}%")
print("=" * 80)


In [0]:
# Cell 6: Write cleaned policies to Silver Delta table
print("üíæ WRITING TO SILVER LAYER...")
print("=" * 80)

# Write to Delta table
policies_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("smart_claims_dev.silver.policies_clean")

print("‚úÖ Successfully written to: smart_claims_dev.silver.policies_clean")
print("=" * 80)

# Verify the write
silver_table = spark.table("smart_claims_dev.silver.policies_clean")
silver_count = silver_table.count()

print(f"üîç VERIFICATION:")
print(f"   Rows written:  {silver_count:>10,}")
print(f"   Expected:      {final_count:>10,}")
print(f"   Match:         {'‚úÖ YES' if silver_count == final_count else '‚ùå NO'}")
print("=" * 80)

# Show sample from Silver table
print("\nüìä SAMPLE SILVER DATA (First 5 rows):")
display(silver_table.limit(5))
