In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("catalogo", "credit_catalog")
dbutils.widgets.text("esquema_source", "silver")
dbutils.widgets.text("esquema_sink", "golden")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
df_application_record_transformed = spark.table(f"{catalogo}.{esquema_source}.application_record_transformed")
df_credit_record_transformed = spark.table(f"{catalogo}.{esquema_source}.credit_record_transformed")

Tabla de comportamiento crediticio, para obtener total de meses de crédito, total de meses con mora, mora de cliente

In [0]:
df_behavior = df_credit_record_transformed.groupBy("ID").agg(
  count("*").alias("total_months"),
  sum("is_overdue").alias("total_overdue_months")
)

df_behavior = df_behavior.withColumn(
  "delinquency_ratio",round(col("total_overdue_months")/col("total_months"),3)
)

Join entre data_frame credit_record_transform (customer information) y dataframe df_behavior (comportamiento crediticio)

In [0]:
df_gold = df_application_record_transformed.join(df_behavior,"ID","left")

Segmentación por edad

In [0]:
df_gold = df_gold.withColumn(
  "age_segment",
  when(col("age") < 30, "Young").when(col("age") < 55,"Adult").otherwise("Senior")
)

Indicador patrimoial, par análisis de riesgo por segmento (convertir variables categoricas a numericas)

In [0]:
df_gold = df_gold.withColumn(
    "owns_property_flag",when(col("FLAG_OWN_REALTY")=="Y",1).otherwise(0)
)

Cálculo de Credit Risck Score

In [0]:
df_gold = df_gold.withColumn(
    "credit_risk_score",
    round(
        (1-col("delinquency_ratio"))*60 +
        when(col("employment_years")>=5,20).otherwise(10) +
        when(col("owns_property_flag")==1,20).otherwise(10)
        ,0)
)

Segmentación de riesgo

In [0]:
df_gold = df_gold.withColumn(
    "risk_segment",
    when(col("credit_risk_score")>=80,"Low Risk")
    .when(col("credit_risk_score")>=60,"Medium Risk").otherwise("High Risk")
)

Seleccion de columnas finales

In [0]:
df_gold_final = df_gold.select(
    col("ID").alias("customer_id"),
    col("age"),
    col("age_segment"),
    col("CODE_GENDER").alias("gender"),
    col("AMT_INCOME_TOTAL").alias("income"),
    col("employment_years"),
    col("owns_property_flag"),
    col("total_months"),
    col("total_overdue_months"),
    col("delinquency_ratio"),
    col("credit_risk_score"),
    col("risk_segment")
)

In [0]:
df_gold_final.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.golden_customer_credit_profile")

In [0]:
display(df_gold_final.limit(15))