In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, when, lag, lead, count, countDistinct, avg, sum, min , max

In [2]:
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.appName("GCPDataPipeleine").getOrCreate()
spark

25/07/06 15:47:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 1. 📊 Customer Credit Risk Score by Bureau History

In [6]:
bureau_df = spark.read.parquet("gs://cred_silver/bureau")
bureau_agg = bureau_df.groupBy("SK_ID_CURR").agg(
    F.sum("AMT_CREDIT_SUM").alias("total_credit"),
    F.sum("AMT_CREDIT_SUM_DEBT").alias("total_debt")
).withColumn("bureau_debt_ratio", col("total_debt") / col("total_credit")).sort('bureau_debt_ratio', ascending =False)
bureau_agg.show(5)

                                                                                

+----------+-----------------+-----------+-----------------+
|SK_ID_CURR|     total_credit| total_debt|bureau_debt_ratio|
+----------+-----------------+-----------+-----------------+
|    205348|          45000.0|   350509.5|           7.7891|
|    449785|67634.45999999999| 473011.425|6.993645325178911|
|    209349|          45000.0|   264780.0|            5.884|
|    292622|        2055982.5|1.2012291E7|5.842603718660056|
|    257060|        588004.56|  3177229.5|5.403409626619221|
+----------+-----------------+-----------+-----------------+
only showing top 5 rows



In [8]:
bureau_agg.write.mode("overwrite").parquet("gs://cred_gold/bureau_agg")

                                                                                

## 2. 🧾 Installment Payment Behavior Index

In [9]:
inst_df = spark.read.parquet("gs://cred_silver/installments_payments")
inst_pay = inst_df.withColumn("days_late", col("DAYS_ENTRY_PAYMENT") - col("DAYS_INSTALMENT"))
inst_late_avg = inst_pay.groupBy("SK_ID_CURR").agg(F.avg("days_late").alias("avg_days_late")).sort('avg_days_late',ascending = False)
inst_late_avg.show(5)




+----------+------------------+
|SK_ID_CURR|     avg_days_late|
+----------+------------------+
|    184984|1884.2045454545455|
|    230218|            1406.0|
|    225340|            1378.5|
|    210216|             950.0|
|    164168| 945.3333333333334|
+----------+------------------+
only showing top 5 rows



                                                                                

In [10]:
inst_late_avg.write.mode("overwrite").parquet("gs://cred_gold/inst_late_avg")

                                                                                

## 3. 💼 Previous Application Behavior Profile

In [11]:
prev_df = spark.read.parquet("gs://cred_silver/previous_application")
prev_app = prev_df.withColumn("app_to_credit_diff", col("AMT_APPLICATION") - col("AMT_CREDIT"))
prev_stats = prev_app.groupBy("SK_ID_CURR").agg(F.avg("app_to_credit_diff").alias("avg_diff_requested_approved")).sort('avg_diff_requested_approved',ascending = False)
prev_stats.show(5)



+----------+---------------------------+
|SK_ID_CURR|avg_diff_requested_approved|
+----------+---------------------------+
|    394154|                  1980000.0|
|    197751|                 1077428.25|
|    280732|         1008895.3987499999|
|    256573|                   905580.0|
|    439129|                   887661.0|
+----------+---------------------------+
only showing top 5 rows



                                                                                

In [12]:
prev_stats.write.mode("overwrite").parquet("gs://cred_gold/prev_app_stats")

                                                                                

## 4. 🧮 Overdue Severity Score from Bureau Balance

In [None]:
bureau_bal = spark.read.parquet("gs://cred_silver/bureau_balance")
severe_bureau = bureau_bal.filter(col("STATUS") == "5")
severe_counts = severe_bureau.join(bureau_df, "SK_ID_BUREAU").groupBy("SK_ID_CURR").count().alias("severe_dpd_count")
severe_counts.show(5)



+----------+-----+
|SK_ID_CURR|count|
+----------+-----+
|    216627|   35|
|    293947|    3|
|    156156|    2|
|    373721|   14|
|    274529|   16|
+----------+-----+
only showing top 5 rows



                                                                                

In [14]:
severe_counts.write.mode("overwrite").parquet("gs://cred_gold/severe_bureau_counts")

                                                                                

## 5. 🏪 POS Loan Repayment Health

In [16]:
pos_df = spark.read.parquet("gs://cred_silver/POS_CASH_balance")
pos_health = pos_df.groupBy("SK_ID_CURR").agg(F.sum("CNT_INSTALMENT_FUTURE").alias("future_pos_installments"))
pos_health.show(5)



+----------+-----------------------+
|SK_ID_CURR|future_pos_installments|
+----------+-----------------------+
|    428330|                  626.0|
|    146581|                 1054.0|
|    411841|                  545.0|
|    250235|                  485.0|
|    126373|                  665.0|
+----------+-----------------------+
only showing top 5 rows



                                                                                

In [17]:
pos_health.write.mode("overwrite").parquet("gs://cred_gold/pos_health")

                                                                                

## 6. 💳 Credit Card Utilization Behavior

In [18]:
cc_df = spark.read.parquet("gs://cred_silver/credit_card_balance")
cc_util = cc_df.withColumn("utilization", col("AMT_DRAWINGS_CURRENT") / col("AMT_CREDIT_LIMIT_ACTUAL"))
cc_util_avg = cc_util.groupBy("SK_ID_CURR").agg(F.avg("utilization").alias("avg_cc_utilization")).sort('avg_cc_utilization',ascending = False)
cc_util_avg.show(5)



+----------+------------------+
|SK_ID_CURR|avg_cc_utilization|
+----------+------------------+
|    245823|           3.10505|
|    416768|               3.0|
|    436756| 2.738913971428571|
|    373596| 2.565406857142857|
|    110288|1.9155284933333332|
+----------+------------------+
only showing top 5 rows



                                                                                

In [19]:
cc_util_avg.write.mode("overwrite").parquet("gs://cred_gold/cc_utilization_avg")

                                                                                

## 7. 📆 Loan Purpose Default Trend

In [20]:
purpose_risk = prev_df.join(inst_late_avg, "SK_ID_CURR").groupBy("NAME_CASH_LOAN_PURPOSE").agg(F.avg("avg_days_late"))
purpose_risk.show(5)



+----------------------+-------------------+
|NAME_CASH_LOAN_PURPOSE| avg(avg_days_late)|
+----------------------+-------------------+
|                   XAP|-11.050306987596407|
|          Urgent needs| -9.158056750937279|
|             Education| -8.080853350226063|
|  Refusal to name t...|  -7.34484847065544|
|  Business development| -7.381461960176898|
+----------------------+-------------------+
only showing top 5 rows



                                                                                

In [21]:
purpose_risk.write.mode("overwrite").parquet("gs://cred_gold/purpose_risk")

                                                                                

## 8. 🔗 Temporal Application Patterns

In [22]:
temporal_pattern = prev_df.join(inst_late_avg, "SK_ID_CURR").groupBy("WEEKDAY_APPR_PROCESS_START").agg(F.avg("avg_days_late"))
temporal_pattern.show(5)

[Stage 83:>                                                         (0 + 6) / 6]

+--------------------------+-------------------+
|WEEKDAY_APPR_PROCESS_START| avg(avg_days_late)|
+--------------------------+-------------------+
|                  SATURDAY|-10.662722240579367|
|                  THURSDAY| -10.40142056783275|
|                    FRIDAY|-10.428568351583213|
|                   TUESDAY| -10.31336519152617|
|                 WEDNESDAY|  -10.3418187157568|
+--------------------------+-------------------+
only showing top 5 rows



                                                                                

In [23]:
temporal_pattern.write.mode("overwrite").parquet("gs://cred_gold/temporal_pattern")

                                                                                

## 9. 🔍 Client Contract Portfolio Summary

In [24]:
contract_mix = prev_df.groupBy("SK_ID_CURR").agg(
    F.count("SK_ID_PREV").alias("prev_contract_count"),
    F.countDistinct("NAME_CONTRACT_TYPE").alias("contract_type_diversity")
)
contract_mix.show(5)



+----------+-------------------+-----------------------+
|SK_ID_CURR|prev_contract_count|contract_type_diversity|
+----------+-------------------+-----------------------+
|    251742|                  9|                      3|
|    406654|                  3|                      1|
|    176948|                  3|                      2|
|    398675|                  1|                      1|
|    170542|                  8|                      2|
+----------+-------------------+-----------------------+
only showing top 5 rows



                                                                                

In [25]:
contract_mix.write.mode("overwrite").parquet("gs://cred_gold/contract_mix")

                                                                                

In [26]:
spark.stop()