In [0]:
from pyspark.sql import functions as F

train_base = spark.table("creditrisk_catalog.silver_creditrisk.app_train").select("SK_ID_CURR","TARGET")
test_base = spark.table("creditrisk_catalog.silver_creditrisk.app_test").select("SK_ID_CURR")

train_base.write.format("delta").mode("overwrite").saveAsTable("creditrisk_catalog.gold_creditrisk.app_train_base")
test_base.write.format("delta").mode("overwrite").saveAsTable("creditrisk_catalog.gold_creditrisk.app_test_base")

print("train_base:",train_base.count())
print("test_base:",test_base.count())

In [0]:
bureau = spark.table("creditrisk_catalog.silver_creditrisk.bureau")
bb = spark.table("creditrisk_catalog.silver_creditrisk.bureau_balance")

In [0]:

bb_agg = (
    bb.groupBy("SK_ID_BUREAU")
      .agg(
          F.count("*").alias("bb_months"),
          F.sum(F.when(F.col("STATUS").isin("1","2","3","4","5"), 1).otherwise(0)).alias("bb_bad_months"),
        #   F.sum(F.when(F.col("STATUS").isin("C"), 1).otherwise(0)).alias("bb_closed_months),
        #   F.sum(F.when(F.col("STATUS").isin("X"), 1).otherwise(0)).alias("bb_unknown_months"),
          F.max("MONTHS_BALANCE").alias("bb_last_month"),
          F.min("MONTHS_BALANCE").alias("bb_first_month")
      )
      .withColumn(
          "bb_bad_rate",
          F.when(F.col("bb_months") > 0, F.col("bb_bad_months") / F.col("bb_months")).otherwise(F.lit(0.0))
      )
)

bureau_enriched = bureau.join(bb_agg, "SK_ID_BUREAU", "left")

bureau_features = (
    bureau_enriched.groupBy("SK_ID_CURR")
      .agg(
          F.countDistinct("SK_ID_BUREAU").alias("bureau_loan_cnt"),
          F.sum("bb_months").alias("bureau_months_total"),
          F.sum("bb_bad_months").alias("bureau_bad_months_total"),
          F.max("bb_bad_rate").alias("bureau_bad_rate_max"),
          F.avg("bb_bad_rate").alias("bureau_bad_rate_avg"),
          F.sum("AMT_CREDIT_SUM").alias("bureau_amt_credit_sum_total"),
          F.avg("AMT_CREDIT_SUM").alias("bureau_amt_credit_sum_avg")
      )
)

(bureau_features.write.format("delta").mode("overwrite")
 .saveAsTable("creditrisk_catalog.gold_creditrisk.bureau_features"))

print("bureau_features rows:", bureau_features.count())
print("bureau_features distinct:", bureau_features.select("SK_ID_CURR").distinct().count())


In [0]:
prev = spark.table("creditrisk_catalog.silver_creditrisk.previous_application")

prev_agg = (
    prev.groupBy("SK_ID_CURR")
      .agg(
          F.countDistinct("SK_ID_PREV").alias("prev_loan_cnt"),
          F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Approved"), 1).otherwise(0)).alias("prev_loan_approved"),
          F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Refused"), 1).otherwise(0)).alias("prev_loan_refused"),
        #   F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Canceled"), 1).otherwise(0)).alias("prev_loan_canceled"),
        #   F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Unused offer"), 1).otherwise(0)).alias("prev_loan_unused"),
        #   F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Sent proposal"), 1).otherwise(0)).alias("prev_loan_sent"),
        #   F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Approved"), F.col("AMT_CREDIT")).otherwise(0)).alias("prev_loan_amt_credit_sum"),
        #   F.sum(F.when(F.col("NAME_CONTRACT_STATUS").isin("Approved"), F.col("AMT_ANNUITY")).otherwise(0)).alias("prev_loan_amt_annuity_sum"),
          # F.avg("AMT_CREDIT").alias("prev_loan_amt_credit_avg"),
          # F.avg("AMT_APPLICATION").alias("prev_loan_amt_application_avg"),
          F.min("DAYS_DECISION").alias("prev_loan_days_decision_min"),
          F.max("DAYS_DECISION").alias("prev_loan_days_decision_max")
      )
      .withColumn(
          "prev_loan_approved_rate",
          F.when(F.col("prev_loan_cnt") > 0, F.col("prev_loan_approved") / F.col("prev_loan_cnt")).otherwise(F.lit(0.0))
      )
)

(prev_agg.write.format("delta").mode("overwrite")
 .saveAsTable("creditrisk_catalog.gold_creditrisk.prev_agg"))

print("prev_agg rows:", prev_agg.count())
print("prev_agg distinct:", prev_agg.select("SK_ID_CURR").distinct().count()
)
   

In [0]:
inst = spark.table("creditrisk_catalog.silver_creditrisk.installments_payments")

inst_features = (
    inst.withColumn("days_late", F.col("DAYS_ENTRY_PAYMENT") - F.col("DAYS_INSTALMENT"))
    .withColumn("amt_diff", F.col("AMT_PAYMENT") - F.col("AMT_INSTALMENT"))
    .groupBy("SK_ID_CURR")
    .agg(
        F.count("*").alias("inst_payment_cnt"),
        F.avg("days_late").alias("inst_payment_days_late_avg"),
        F.max("days_late").alias("inst_payment_days_late_max"),
        F.avg("amt_diff").alias("inst_payment_amt_diff_avg"),
        F.sum(F.when(F.col("days_late") > 0, 1).otherwise(0)).alias("inst_payment_late_cnt")
    )
    .withColumn(
        "inst_payment_late_rate",
        F.when(F.col("inst_payment_cnt") > 0, F.col("inst_payment_late_cnt") / F.col("inst_payment_cnt")).otherwise(F.lit(0.0))
    )
)

inst_features.write.mode("overwrite").saveAsTable("creditrisk_catalog.gold_creditrisk.installments_payment_features")
print("installments_payment_features rows:", inst_features.count())
print("installments_payment_features distinct:", inst_features.select("SK_ID_CURR").distinct().count())

In [0]:
pos = spark.table("creditrisk_catalog.silver_creditrisk.pos_cash_balance")

pos_features = (
    pos.groupBy("SK_ID_CURR")
    .agg(
        F.count("*").alias("pos_cash_balance_cnt"),
        F.countDistinct("SK_ID_PREV").alias("pos_cash_balance_prev_cnt"),
        F.avg("SK_DPD").alias("pos_dpd_avg"),
        F.max("SK_DPD").alias("pos_dpd_max")
    )
)

(pos_features.write.format("delta").mode("overwrite")
 .saveAsTable("creditrisk_catalog.gold_creditrisk.pos_cash_balance_features"))

print("pos_cash_balance_features rows:", pos_features.count())
print("pos_cash_balance_features distinct:", pos_features.select("SK_ID_CURR").distinct().count())

In [0]:
cc = spark.table("creditrisk_catalog.silver_creditrisk.credit_card_balance")

cc_features = (
    cc.groupBy("SK_ID_CURR")
    .agg(
        F.count("*").alias("cc_months"),
        F.countDistinct("SK_ID_PREV").alias("cc_prev_cnt"),
        F.avg("AMT_BALANCE").alias("cc_balance_avg"),
        F.max("AMT_BALANCE").alias("cc_balance_max"),
        F.max("SK_DPD").alias("cc_dpd_max")        
    )
)

(cc_features.write.format("delta").mode("overwrite")
 .saveAsTable("creditrisk_catalog.gold_creditrisk.credit_card_balance_features"))

print("credit_card_balance_features rows:", cc_features.count())
print("credit_card_balance_features distinct:", cc_features.select("SK_ID_CURR").distinct().count())


In [0]:
train = spark.table("creditrisk_catalog.gold_creditrisk.app_train_base")
test = spark.table("creditrisk_catalog.gold_creditrisk.app_test_base")

feature_tables = [
    "bureau_features",
    "prev_agg",
    "installments_payment_features",
    "pos_cash_balance_features",
    "credit_card_balance_features"
]

def join_all(base_df):
    out = base_df
    for t in feature_tables:
        ft = spark.table(f"creditrisk_catalog.gold_creditrisk.{t}")
        out = out.join(ft, on="SK_ID_CURR", how="left")
    return out

gold_train = join_all(train)
gold_test = join_all(test)

(gold_train.write.format("delta").option("mergeSchema", "true").mode("overwrite")
.saveAsTable("creditrisk_catalog.gold_creditrisk.train_dataset"))

(gold_test.write.format("delta").option("mergeSchema", "true").mode("overwrite")
.saveAsTable("creditrisk_catalog.gold_creditrisk.test_dataset"))

print("gold_train:", gold_train.count(),gold_train.select("SK_ID_CURR").distinct().count())
print("gold_test:", gold_test.count(), gold_test.select("SK_ID_CURR").distinct().count())

In [0]:
spark.table("creditrisk_catalog.gold_creditrisk.train_dataset").select("TARGET").groupBy("TARGET").count().show()
spark.table("creditrisk_catalog.gold_creditrisk.train_dataset").printSchema()



In [0]:
%sql

OPTIMIZE creditrisk_catalog.gold_creditrisk.bureau_features
ZORDER BY (SK_ID_CURR);

OPTIMIZE creditrisk_catalog.gold_creditrisk.prev_agg
ZORDER BY (SK_ID_CURR);

OPTIMIZE creditrisk_catalog.gold_creditrisk.installments_payment_features
ZORDER BY (SK_ID_CURR);

OPTIMIZE creditrisk_catalog.gold_creditrisk.pos_cash_balance_features
ZORDER BY (SK_ID_CURR);

OPTIMIZE creditrisk_catalog.gold_creditrisk.credit_card_balance_features
ZORDER BY (SK_ID_CURR);

OPTIMIZE creditrisk_catalog.gold_creditrisk.train_dataset
ZORDER BY (SK_ID_CURR);

OPTIMIZE creditrisk_catalog.gold_creditrisk.test_dataset
ZORDER BY (SK_ID_CURR);


In [0]:
from pyspark.sql import functions as F

def apply_business_rules(df):
    return (
        df
        # 1. Bureau delinquency severity
        .withColumn(
            "high_bureau_risk_flag",
            F.when(F.col("bureau_bad_rate_max") >= 0.3, 1).otherwise(0)
        )

        # 2. Payment discipline
        .withColumn(
            "frequent_late_payer_flag",
            F.when(F.col("inst_payment_late_rate") > 0.2, 1).otherwise(0)
        )

        # 3. Severe delinquency across revolving / POS
        .withColumn(
            "severe_dpd_flag",
            F.when(
                F.greatest(
                    F.col("pos_dpd_max"),
                    F.col("cc_dpd_max")
                ) >= 30,
                1
            ).otherwise(0)
        )

        # 4. Previous application pressure
        .withColumn(
            "prev_refusal_ratio",
            F.when(
                F.col("prev_loan_cnt") > 0,
                F.col("prev_loan_refused") / F.col("prev_loan_cnt")
            ).otherwise(0)
        )

        # 5. Bureau exposure concentration
        .withColumn(
            "avg_bureau_credit_per_loan",
            F.when(
                F.col("bureau_loan_cnt") > 0,
                F.col("bureau_amt_credit_sum_total") / F.col("bureau_loan_cnt")
            ).otherwise(0)
        )

        # 6. Credit history coverage
        .withColumn(
            "has_credit_activity_flag",
            F.when(
                (F.col("bureau_loan_cnt") > 0) |
                (F.col("pos_cash_balance_cnt") > 0) |
                (F.col("cc_months") > 0),
                1
            ).otherwise(0)
        )
    )


In [0]:
gold_train_b = apply_business_rules(gold_train)
gold_test_b  = apply_business_rules(gold_test)

(gold_train_b.write.format("delta").mode("overwrite")
.saveAsTable("creditrisk_catalog.gold_creditrisk.train_dataset_b"))

(gold_test_b.write.format("delta").mode("overwrite")
.saveAsTable("creditrisk_catalog.gold_creditrisk.test_dataset_b"))

In [0]:
gold_train_b.printSchema()

In [0]:
%sql

VACUUM creditrisk_catalog.gold_creditrisk.bureau_features RETAIN 168 HOURS;
VACUUM creditrisk_catalog.gold_creditrisk.prev_agg RETAIN 168 HOURS;
VACUUM creditrisk_catalog.gold_creditrisk.installments_payment_features RETAIN 168 HOURS;
VACUUM creditrisk_catalog.gold_creditrisk.pos_cash_balance_features RETAIN 168 HOURS;
VACUUM creditrisk_catalog.gold_creditrisk.credit_card_balance_features RETAIN 168 HOURS;
VACUUM creditrisk_catalog.gold_creditrisk.train_dataset RETAIN 168 HOURS;
VACUUM creditrisk_catalog.gold_creditrisk.test_dataset RETAIN 168 HOURS;

