In [0]:
# Databricks notebook: gold_layer (Complete Version with Catalog Tables)
from pyspark.sql.functions import col, when, round, avg, max, min, count, sum as spark_sum, desc, first

print("="*70)
print("GOLD LAYER - ANALYTICS & AGGREGATIONS")
print("="*70)

# STEP 1: DEFINE PATHS (Using /mnt/cc mount)
silver_path = "/mnt/cc/silver/credit_default"
gold_path = "/mnt/cc/gold/credit_default"

print(f"\nSilver path: {silver_path}")
print(f"Gold path: {gold_path}")


# STEP 2: READ FROM SILVER LAYER
print("\n" + "-"*70)
print("STEP 1: READING SILVER DATA")
print("-"*70)

try:
    df = spark.read.format("delta").load(silver_path)
    row_count = df.count()
    col_count = len(df.columns)
    print(f"Successfully loaded silver data")
    print(f"  Rows: {row_count:,}")
    print(f"  Columns: {col_count}")
    
    # Check which default column exists
    if "default_flag" in df.columns:
        default_col = "default_flag"
    elif "default" in df.columns:
        default_col = "default"
        df = df.withColumnRenamed("default", "default_flag")
    else:
        print("Warning: No default column found!")
        default_col = None
except Exception as e:
    print(f"Error reading silver layer: {e}")
    raise


# STEP 3: CREATE GOLD LAYER AGGREGATIONS
print("\n" + "-"*70)
print("STEP 2: CREATING GOLD LAYER AGGREGATIONS")
print("-"*70)

# 3A: Customer-Level Aggregations
print("\n3A. Creating customer-level aggregations...")
customer_agg = df.groupBy("ID").agg(
    first("LIMIT_BAL").alias("LIMIT_BAL"),
    first("SEX").alias("SEX"),
    first("EDUCATION").alias("EDUCATION"),
    first("MARRIAGE").alias("MARRIAGE"),
    first("AGE").alias("AGE"),
    first("PAY_0").alias("latest_pay_status"),
    avg("PAY_0").alias("avg_pay_status"),
    avg("BILL_AMT1").alias("avg_monthly_bill"),
    max("BILL_AMT1").alias("max_monthly_bill"),
    min("BILL_AMT1").alias("min_monthly_bill"),
    first("avg_bill_amt").alias("avg_bill_6months"),
    avg("PAY_AMT1").alias("avg_monthly_payment"),
    max("PAY_AMT1").alias("max_monthly_payment"),
    min("PAY_AMT1").alias("min_monthly_payment"),
    round((avg("BILL_AMT1") / first("LIMIT_BAL")) * 100, 2).alias("utilization_ratio"),
    first("default_flag").alias("default_flag"),
    count("*").alias("record_count")
).filter(col("record_count") > 0)
print("  Customer-level aggregations created")

# 3B: Risk Segment Analysis
print("\n3B. Creating risk segment analysis...")
risk_segments = df.groupBy(
    when(col("avg_bill_amt") > 50000, "High Spender")
        .when(col("avg_bill_amt") > 20000, "Medium Spender")
        .otherwise("Low Spender").alias("spending_segment"),
    when(col("PAY_0").isin(2, 0, -1, -2), "Good Standing")
        .when(col("PAY_0") == 1, "Revolving")
        .otherwise("Delinquent").alias("payment_status_segment"),
    col("default_flag").alias("default_flag")
).agg(
    count("*").alias("customer_count"),
    round(avg("LIMIT_BAL"), 2).alias("avg_credit_limit"),
    round(avg("avg_bill_amt"), 2).alias("avg_bill_amount"),
    round(avg("AGE"), 1).alias("avg_age"),
    round((spark_sum(when(col("default_flag") == "Y", 1).otherwise(0)) / count("*")) * 100, 2).alias("default_rate_pct")
).orderBy(desc("customer_count"))
print("  Risk segment analysis created")

# 3C: Education Level Analysis
print("\n3C. Creating education level analysis...")
education_agg = df.groupBy("EDUCATION").agg(
    count("*").alias("customer_count"),
    round(avg("LIMIT_BAL"), 2).alias("avg_credit_limit"),
    round(avg("avg_bill_amt"), 2).alias("avg_bill_amount"),
    round(avg("AGE"), 1).alias("avg_age"),
    round((spark_sum(when(col("default_flag") == "Y", 1).otherwise(0)) / count("*")) * 100, 2).alias("default_rate_pct")
).orderBy("EDUCATION")
print("  Education level analysis created")

# 3D: Age Group Analysis
print("\n3D. Creating age group analysis...")
age_group_agg = df.groupBy(
    when(col("AGE") < 25, "18-24")
        .when(col("AGE") < 35, "25-34")
        .when(col("AGE") < 45, "35-44")
        .when(col("AGE") < 55, "45-54")
        .otherwise("55+").alias("age_group"),
    col("default_flag").alias("default_flag")
).agg(
    count("*").alias("customer_count"),
    round(avg("LIMIT_BAL"), 2).alias("avg_credit_limit"),
    round(avg("avg_bill_amt"), 2).alias("avg_bill_amount"),
    round(avg("AGE"), 1).alias("avg_age")
).orderBy("age_group")
print("  Age group analysis created")

# 3E: Gender Analysis
print("\n3E. Creating gender analysis...")
gender_agg = df.groupBy(
    col("SEX").alias("gender"),
    col("default_flag").alias("default_flag")
).agg(
    count("*").alias("customer_count"),
    round(avg("LIMIT_BAL"), 2).alias("avg_credit_limit"),
    round(avg("avg_bill_amt"), 2).alias("avg_bill_amount"),
    round(avg("AGE"), 1).alias("avg_age")
).orderBy("gender")
print("  Gender analysis created")

# 3F: Default Prediction Scorecard
print("\n3F. Creating default prediction scorecard...")
scorecard = df.groupBy(
    when((col("avg_bill_amt") / col("LIMIT_BAL")) > 0.8, "High Utilization")
        .when((col("avg_bill_amt") / col("LIMIT_BAL")) > 0.5, "Medium Utilization")
        .otherwise("Low Utilization").alias("credit_utilization"),
    when(col("PAY_0") == 0, "On-Time")
        .when(col("PAY_0") == 1, "1 Month Late")
        .when(col("PAY_0") == 2, "2+ Months Late")
        .otherwise("Revolving").alias("latest_payment_status"),
    col("default_flag").alias("default_flag")
).agg(
    count("*").alias("count"),
    round(avg("AGE"), 1).alias("avg_age"),
    round(avg("LIMIT_BAL"), 0).alias("avg_limit"),
    round(avg("avg_bill_amt"), 0).alias("avg_bill")
).orderBy(desc("count"))
print("  Default prediction scorecard created")

print("\nAll gold layer aggregations created successfully")

# STEP 4: WRITE TO GOLD LAYER & CREATE TABLES
print("\n" + "-"*70)
print("STEP 3: CREATING & REGISTERING GOLD LAYER TABLES IN CATALOG")
print("-"*70)

# Create database first
print("\nCreating database...")
spark.sql("CREATE DATABASE IF NOT EXISTS credit_risk")
print("Database created: credit_risk")

# Define all tables to create
tables = [
    {
        "name": "gold_customer_agg",
        "df": customer_agg,
        "description": "Customer-level aggregations with all metrics"
    },
    {
        "name": "gold_risk_segments",
        "df": risk_segments,
        "description": "Risk segment analysis"
    },
    {
        "name": "gold_education_agg",
        "df": education_agg,
        "description": "Education demographics"
    },
    {
        "name": "gold_age_group_agg",
        "df": age_group_agg,
        "description": "Age group analysis"
    },
    {
        "name": "gold_gender_agg",
        "df": gender_agg,
        "description": "Gender analysis"
    },
    {
        "name": "gold_scorecard",
        "df": scorecard,
        "description": "Default prediction scorecard"
    }
]

print("\nCreating managed tables in Catalog...")
for table_info in tables:
    table_name = table_info["name"]
    df_table = table_info["df"]
    description = table_info["description"]
    
    try:
        print(f"\nCreating table: credit_risk.{table_name}")
        
        # Write and register as managed table
        df_table.write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable(f"credit_risk.{table_name}")
        
        # Add table comment for documentation
        spark.sql(f"""
            ALTER TABLE credit_risk.{table_name}
            SET TBLPROPERTIES (
                'description' = '{description}'
            )
        """)
        
        record_count = df_table.count()
        print(f"  Success: {record_count:,} records")
        
    except Exception as e:
        print(f"  Error: {e}")
        raise

print("\n" + "="*70)
print("ALL TABLES CREATED SUCCESSFULLY AND VISIBLE IN CATALOG")
print("="*70)


# STEP 5: VERIFY TABLES IN CATALOG
print("\n" + "-"*70)
print("STEP 4: VERIFYING TABLES IN CATALOG")
print("-"*70)

print("\nTables created in credit_risk database:")
catalog_tables = spark.sql("SHOW TABLES IN credit_risk")
display(catalog_tables)


# STEP 6: DISPLAY GOLD LAYER INSIGHTS
print("\n" + "="*70)
print("GOLD LAYER ANALYTICS - DATA PREVIEW")
print("="*70)

print("\n" + "-"*70)
print("TABLE 1: CUSTOMER AGGREGATIONS (Sample)")
print("-"*70)
print(f"Total customers: {customer_agg.count():,}")
display(customer_agg.limit(10))

print("\n" + "-"*70)
print("TABLE 2: RISK SEGMENTS")
print("-"*70)
display(risk_segments)

print("\n" + "-"*70)
print("TABLE 3: EDUCATION LEVEL ANALYSIS")
print("-"*70)
display(education_agg)

print("\n" + "-"*70)
print("TABLE 4: AGE GROUP ANALYSIS")
print("-"*70)
display(age_group_agg)

print("\n" + "-"*70)
print("TABLE 5: GENDER ANALYSIS")
print("-"*70)
display(gender_agg)

print("\n" + "-"*70)
print("TABLE 6: DEFAULT SCORECARD")
print("-"*70)
display(scorecard)

# STEP 7: KEY INSIGHTS
print("\n" + "-"*70)
print("KEY INSIGHTS")
print("-"*70)

try:
    bronze_path_alt = "/mnt/cc/bronze/credit_default"
    df_bronze = spark.read.format("delta").load(bronze_path_alt)
    
    default_rate = df_bronze.select(
        round(100.0 * spark_sum(when(col("default_flag") == "Y", 1).otherwise(0)) / count("*"), 2).alias("default_rate_pct")
    ).collect()[0][0]
    print(f"\nOverall Default Rate: {default_rate}%")
except Exception as e:
    print(f"\nCould not calculate default rate: {e}")

try:
    print("\nHighest Risk Segments:")
    display(risk_segments.filter(col("default_rate_pct") > 0).orderBy(desc("default_rate_pct")).limit(5))
except Exception as e:
    print(f"Could not display risk analysis: {e}")


# FINAL SUMMARY
print("\n" + "="*70)
print("GOLD LAYER PIPELINE COMPLETE!")
print("="*70)

print("\n✓ TABLES NOW IN CATALOG:")
print("  1. credit_risk.gold_customer_agg")
print("  2. credit_risk.gold_risk_segments")
print("  3. credit_risk.gold_education_agg")
print("  4. credit_risk.gold_age_group_agg")
print("  5. credit_risk.gold_gender_agg")
print("  6. credit_risk.gold_scorecard")

print("\n✓ HOW TO ACCESS:")
print("  1. Click 'Catalog' in left sidebar")
print("  2. Click 'credit_risk' database")
print("  3. All 6 tables are now visible")
print("  4. Click any table to view schema, stats, and data")

print("\n✓ TABLES ARE PERMANENT MANAGED TABLES")
print("  - Visible in Databricks Catalog")
print("  - Persisted in workspace storage")
print("  - Can be queried via SQL or Python")
print("  - Full metadata and lineage tracking")

print("\n" + "="*70)

GOLD LAYER - ANALYTICS & AGGREGATIONS

Silver path: /mnt/cc/silver/credit_default
Gold path: /mnt/cc/gold/credit_default

----------------------------------------------------------------------
STEP 1: READING SILVER DATA
----------------------------------------------------------------------
Successfully loaded silver data
  Rows: 90,000
  Columns: 27

----------------------------------------------------------------------
STEP 2: CREATING GOLD LAYER AGGREGATIONS
----------------------------------------------------------------------

3A. Creating customer-level aggregations...
  Customer-level aggregations created

3B. Creating risk segment analysis...
  Risk segment analysis created

3C. Creating education level analysis...
  Education level analysis created

3D. Creating age group analysis...
  Age group analysis created

3E. Creating gender analysis...
  Gender analysis created

3F. Creating default prediction scorecard...
  Default prediction scorecard created

All gold layer aggrega

database,tableName,isTemporary
credit_risk,gold_age_group_agg,False
credit_risk,gold_customer_agg,False
credit_risk,gold_education_agg,False
credit_risk,gold_gender_agg,False
credit_risk,gold_risk_segments,False
credit_risk,gold_scorecard,False



GOLD LAYER ANALYTICS - DATA PREVIEW

----------------------------------------------------------------------
TABLE 1: CUSTOMER AGGREGATIONS (Sample)
----------------------------------------------------------------------
Total customers: 30,000


ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,latest_pay_status,avg_pay_status,avg_monthly_bill,max_monthly_bill,min_monthly_bill,avg_bill_6months,avg_monthly_payment,max_monthly_payment,min_monthly_payment,utilization_ratio,default_flag,record_count
299.0,280000.0,F,4,Married,36.0,0.0,0.0,58983.0,58983.0,58983.0,45606.66666666666,2755.0,2755.0,2755.0,21.07,N,3
305.0,200000.0,F,4,Single,27.0,-1.0,-1.0,871.0,871.0,871.0,3039.333333333333,665.0,665.0,665.0,0.44,N,3
496.0,50000.0,M,4,Married,41.0,-1.0,-1.0,508.0,508.0,508.0,389.3333333333333,1000.0,1000.0,1000.0,1.02,N,3
596.0,200000.0,M,4,Married,43.0,0.0,0.0,144678.0,144678.0,144678.0,137685.0,5090.0,5090.0,5090.0,72.34,N,3
692.0,170000.0,M,4,Single,27.0,-1.0,-1.0,1170.0,1170.0,1170.0,325.0,0.0,0.0,0.0,0.69,N,3
769.0,220000.0,M,4,Married,40.0,-1.0,-1.0,158.0,158.0,158.0,128.33333333333334,0.0,0.0,0.0,0.07,N,3
934.0,30000.0,M,4,Single,32.0,2.0,2.0,7851.0,7851.0,7851.0,10852.0,1500.0,1500.0,1500.0,26.17,Y,3
1051.0,110000.0,M,4,Single,31.0,2.0,2.0,279.0,279.0,279.0,237.33333333333331,0.0,0.0,0.0,0.25,N,3
1761.0,290000.0,F,4,Married,33.0,1.0,1.0,7064.0,7064.0,7064.0,9723.5,0.0,0.0,0.0,2.44,N,3
2734.0,170000.0,F,4,Single,31.0,2.0,2.0,520.0,520.0,520.0,3081.6666666666665,1339.0,1339.0,1339.0,0.31,N,3



----------------------------------------------------------------------
TABLE 2: RISK SEGMENTS
----------------------------------------------------------------------


spending_segment,payment_status_segment,default_flag,customer_count,avg_credit_limit,avg_bill_amount,avg_age,default_rate_pct
Low Spender,Good Standing,N,28809,176419.45,6994.56,35.4,0.0
High Spender,Good Standing,N,18726,223373.92,122196.27,35.9,0.0
Medium Spender,Good Standing,N,14859,136157.08,33452.31,34.9,0.0
Low Spender,Good Standing,Y,7134,134108.49,6718.0,35.8,100.0
Low Spender,Revolving,N,5097,169140.67,4405.53,35.2,0.0
High Spender,Good Standing,Y,4233,181444.14,126148.53,36.5,100.0
Medium Spender,Good Standing,Y,3786,78851.03,33403.78,34.5,100.0
Low Spender,Revolving,Y,2388,136444.72,5359.15,36.4,100.0
Medium Spender,Revolving,N,1215,96148.15,32891.06,35.0,0.0
High Spender,Revolving,N,996,189006.02,117546.58,34.7,0.0



----------------------------------------------------------------------
TABLE 3: EDUCATION LEVEL ANALYSIS
----------------------------------------------------------------------


EDUCATION,customer_count,avg_credit_limit,avg_bill_amount,avg_age,default_rate_pct
0,42,217142.86,10898.55,38.9,0.0
4,89958,167461.14,44992.86,35.5,22.13



----------------------------------------------------------------------
TABLE 4: AGE GROUP ANALYSIS
----------------------------------------------------------------------


age_group,default_flag,customer_count,avg_credit_limit,avg_bill_amount,avg_age
18-24,N,5865,64143.22,25966.52,23.2
18-24,Y,2190,52465.75,25385.31,23.1
25-34,Y,7923,131188.94,44362.85,29.2
25-34,N,31110,180718.42,46418.3,29.4
35-44,Y,5913,153910.54,50286.82,39.1
35-44,N,21141,204660.14,47785.89,39.0
45-54,N,9660,182665.22,47222.6,48.7
45-54,Y,3039,132152.02,42163.87,48.8
55+,Y,843,147366.55,38965.53,58.8
55+,N,2316,170012.95,51699.37,58.8



----------------------------------------------------------------------
TABLE 5: GENDER ANALYSIS
----------------------------------------------------------------------


gender,default_flag,customer_count,avg_credit_limit,avg_bill_amount,avg_age
F,N,43047,179726.53,43999.21,34.8
F,Y,11289,133327.13,42311.45,34.9
M,Y,8619,125895.47,44988.59,36.8
M,N,27045,175510.37,47642.09,36.4



----------------------------------------------------------------------
TABLE 6: DEFAULT SCORECARD
----------------------------------------------------------------------


credit_utilization,latest_payment_status,default_flag,count,avg_age,avg_limit,avg_bill
Low Utilization,Revolving,N,20829,36.5,229409.0,10000.0
Low Utilization,On-Time,N,19266,34.6,197183.0,41561.0
Medium Utilization,On-Time,N,11304,35.4,130581.0,85183.0
High Utilization,On-Time,N,7977,35.1,112951.0,103545.0
Low Utilization,1 Month Late,N,5370,35.4,188123.0,9866.0
Low Utilization,Revolving,Y,4167,36.7,189834.0,5612.0
Low Utilization,1 Month Late,Y,2235,36.7,163812.0,9138.0
Low Utilization,On-Time,Y,2106,34.7,131068.0,26994.0
Low Utilization,2+ Months Late,Y,1962,36.1,133578.0,28272.0
High Utilization,2+ Months Late,Y,1929,35.1,98040.0,90098.0



----------------------------------------------------------------------
KEY INSIGHTS
----------------------------------------------------------------------

Could not calculate default rate: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `default_flag` cannot be resolved. Did you mean one of the following? [`default`, `ingest_ts`, `BILL_AMT1`, `BILL_AMT2`, `BILL_AMT3`].;
'Aggregate [round(((sum(CASE WHEN ('default_flag = Y) THEN 1 ELSE 0 END) * 100.0) / count(1)), 2) AS default_rate_pct#58876]
+- Relation [ID#58822,LIMIT_BAL#58823,SEX#58824,EDUCATION#58825,MARRIAGE#58826,AGE#58827,PAY_0#58828,PAY_2#58829,PAY_3#58830,PAY_4#58831,PAY_5#58832,PAY_6#58833,BILL_AMT1#58834,BILL_AMT2#58835,BILL_AMT3#58836,BILL_AMT4#58837,BILL_AMT5#58838,BILL_AMT6#58839,PAY_AMT1#58840,PAY_AMT2#58841,PAY_AMT3#58842,PAY_AMT4#58843,PAY_AMT5#58844,PAY_AMT6#58845,... 2 more fields] parquet


Highest Risk Segments:


spending_segment,payment_status_segment,default_flag,customer_count,avg_credit_limit,avg_bill_amount,avg_age,default_rate_pct
Medium Spender,Delinquent,Y,279,52150.54,33327.69,34.8,100.0
Medium Spender,Good Standing,Y,3786,78851.03,33403.78,34.5,100.0
Low Spender,Revolving,Y,2388,136444.72,5359.15,36.4,100.0
High Spender,Delinquent,Y,234,173717.95,150703.62,35.8,100.0
Low Spender,Good Standing,Y,7134,134108.49,6718.0,35.8,100.0



GOLD LAYER PIPELINE COMPLETE!

✓ TABLES NOW IN CATALOG:
  1. credit_risk.gold_customer_agg
  2. credit_risk.gold_risk_segments
  3. credit_risk.gold_education_agg
  4. credit_risk.gold_age_group_agg
  5. credit_risk.gold_gender_agg
  6. credit_risk.gold_scorecard

✓ HOW TO ACCESS:
  1. Click 'Catalog' in left sidebar
  2. Click 'credit_risk' database
  3. All 6 tables are now visible
  4. Click any table to view schema, stats, and data

✓ TABLES ARE PERMANENT MANAGED TABLES
  - Visible in Databricks Catalog
  - Persisted in workspace storage
  - Can be queried via SQL or Python
  - Full metadata and lineage tracking

