In [0]:
 from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# ---------------- Initialize Spark ----------------
spark = SparkSession.builder.appName("CreditRisk_GoldLayer").getOrCreate()

# ---------------- Silver Layer Details ----------------
silver_catalog = "silver1"
silver_schema = "credit_data"
silver_table = "credit_silver"

# ---------------- Gold Layer Details ----------------
gold_catalog = "gold1"
gold_schema = "analytics"
gold_table = "risk_features"

# ---------------- Step 1: Load Silver Data ----------------
df_silver = spark.table(f"{silver_catalog}.{silver_schema}.{silver_table}")

print("Schema of Silver Data:")
df_silver.printSchema()

# ---------------- Step 2: Feature Engineering ----------------
# Risk Score = (loan_amnt / person_income) * risk factor (double weight if defaulted)
df_gold = (
    df_silver
    .withColumn(
        "risk_score",
        (F.col("loan_amnt") / (F.col("person_income") + F.lit(1))) *
        (F.when(F.col("loan_status") == 1, 2).otherwise(1))
    )
)

# ---------------- Step 3: Aggregated Risk Metrics ----------------
# Group by multiple dimensions
group_cols = ["person_age", "loan_intent", "loan_grade", "person_home_ownership"]

df_aggregated = (
    df_gold.groupBy(*group_cols)
    .agg(
        F.avg("risk_score").alias("avg_risk_score"),
        F.avg("person_income").alias("avg_income"),
        F.avg("loan_amnt").alias("avg_loan_amount"),
        F.sum("loan_status").alias("total_defaults"),
        F.count("*").alias("customer_count"),
        (F.sum("loan_status") / F.count("*")).alias("default_rate")
    )
    .orderBy("person_age", "loan_intent", "loan_grade")
)

# ---------------- Step 4: Create Gold Schema ----------------
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {gold_catalog}.{gold_schema}")

# ---------------- Step 5: Save Gold Table ----------------
(
    df_aggregated.write.format("delta")
    .mode("overwrite")
    .saveAsTable(f"{gold_catalog}.{gold_schema}.{gold_table}")
)

print(f"✅ Gold Layer Table Created: {gold_catalog}.{gold_schema}.{gold_table}")

# ---------------- Step 6: Verify ----------------
df_gold_sample = spark.sql(f"SELECT * FROM {gold_catalog}.{gold_schema}.{gold_table} LIMIT 10")
df_gold_sample.show(truncate=False)
# Load Gold Table
df_gold_final = spark.table(f"{gold_catalog}.{gold_schema}.{gold_table}")

# Display Data (Databricks interactive table)
display(df_gold_final)
