In [1]:
import pyspark
from pyspark.sql import functions as F
spark = (
    pyspark.sql.SparkSession.builder
    .appName("MLE-A2-EDA")
    .master("local[*]")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/08 06:51:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pathlib import Path

GOLD_MD_DIR = "datamart/gold/model_dataset"

paths = sorted([str(p) for p in Path(GOLD_MD_DIR).glob("gold_model_dataset_*.parquet")])
paths[:5], len(paths)


(['datamart/gold/model_dataset/gold_model_dataset_2023_01_01.parquet',
  'datamart/gold/model_dataset/gold_model_dataset_2023_02_01.parquet',
  'datamart/gold/model_dataset/gold_model_dataset_2023_03_01.parquet',
  'datamart/gold/model_dataset/gold_model_dataset_2023_04_01.parquet',
  'datamart/gold/model_dataset/gold_model_dataset_2023_05_01.parquet'],
 24)

In [3]:
df = spark.read.parquet(*paths)
print("rows =", df.count(), "cols =", len(df.columns))
df.groupBy("snapshot_date").count().orderBy("snapshot_date").show(30, False)


                                                                                

rows = 8974 cols = 60


                                                                                

+-------------+-----+
|snapshot_date|count|
+-------------+-----+
|2023-07-01   |530  |
|2023-08-01   |501  |
|2023-09-01   |506  |
|2023-10-01   |510  |
|2023-11-01   |521  |
|2023-12-01   |517  |
|2024-01-01   |471  |
|2024-02-01   |481  |
|2024-03-01   |454  |
|2024-04-01   |487  |
|2024-05-01   |491  |
|2024-06-01   |489  |
|2024-07-01   |485  |
|2024-08-01   |518  |
|2024-09-01   |511  |
|2024-10-01   |513  |
|2024-11-01   |491  |
|2024-12-01   |498  |
+-------------+-----+



In [4]:
from pathlib import Path

LABEL_DIR = "datamart/gold/label_store"
label_paths = sorted([str(p) for p in Path(LABEL_DIR).glob("gold_label_store_*.parquet")])
print("Label files:", len(label_paths))

lbl = spark.read.parquet(*label_paths)
print("rows =", lbl.count(), "cols =", len(lbl.columns))
lbl.groupBy("snapshot_date").count().orderBy("snapshot_date").show(40, False)


Label files: 24


                                                                                

rows = 8974 cols = 5


[Stage 11:>                                                       (0 + 24) / 24]

+-------------+-----+
|snapshot_date|count|
+-------------+-----+
|2023-07-01   |530  |
|2023-08-01   |501  |
|2023-09-01   |506  |
|2023-10-01   |510  |
|2023-11-01   |521  |
|2023-12-01   |517  |
|2024-01-01   |471  |
|2024-02-01   |481  |
|2024-03-01   |454  |
|2024-04-01   |487  |
|2024-05-01   |491  |
|2024-06-01   |489  |
|2024-07-01   |485  |
|2024-08-01   |518  |
|2024-09-01   |511  |
|2024-10-01   |513  |
|2024-11-01   |491  |
|2024-12-01   |498  |
+-------------+-----+



                                                                                

In [5]:
# Label distribution
df.groupBy("label").count().orderBy("label").show()

# Percentage breakdown
total = df.count()
df.groupBy("label").agg(
    (F.count("*") / F.lit(total) * 100).alias("percent")
).orderBy("label").show()

# Sample a few columns
df.select("customer_id", "snapshot_date", "label", "dti", "emi_to_income", "pay_behav_coarse", "util_bucket").show(10)


                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    0| 6383|
|    1| 2591|
+-----+-----+



                                                                                

+-----+------------------+
|label|           percent|
+-----+------------------+
|    0| 71.12770225094718|
|    1|28.872297749052816|
+-----+------------------+

+-----------+-------------+-----+----+-------------+----------------+-----------+
|customer_id|snapshot_date|label| dti|emi_to_income|pay_behav_coarse|util_bucket|
+-----------+-------------+-----+----+-------------+----------------+-----------+
| CUS_0x1037|   2023-07-01|    0|NULL|         NULL|            NULL|       NULL|
| CUS_0x1069|   2023-07-01|    0|NULL|         NULL|            NULL|       NULL|
| CUS_0x114a|   2023-07-01|    0|NULL|         NULL|            NULL|       NULL|
| CUS_0x1184|   2023-07-01|    0|NULL|         NULL|            NULL|       NULL|
| CUS_0x1297|   2023-07-01|    1|NULL|         NULL|            NULL|       NULL|
| CUS_0x12fb|   2023-07-01|    0|NULL|         NULL|            NULL|       NULL|
| CUS_0x1325|   2023-07-01|    0|NULL|         NULL|            NULL|       NULL|
| CUS_0x1341|   2

In [6]:
total = df.count()
null_rates = (
    df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
      .toPandas()
      .T.reset_index()
      .rename(columns={"index": "column", 0: "nulls"})
)
null_rates["null_rate"] = (null_rates["nulls"] / total) * 100
null_rates.sort_values("null_rate", ascending=False).head(20)


                                                                                

Unnamed: 0,column,nulls,null_rate
8,credit_mix,8974,100.0
7,num_credit_inquiries,8974,100.0
6,interest_rate,8974,100.0
5,snap_ym,8974,100.0
10,credit_history_age,8974,100.0
9,num_of_loan,8974,100.0
11,credit_utilization_ratio,8974,100.0
16,util_capped,8974,100.0
15,invest_to_income,8974,100.0
14,emi_to_balance,8974,100.0


In [7]:
from pathlib import Path

fin_dir = "datamart/gold/features/financials"
fin_paths = sorted([str(p) for p in Path(fin_dir).glob("gold_financials_features_*.parquet")])
print("Financial feature files:", len(fin_paths))

fin_df = spark.read.parquet(*fin_paths)
print("rows =", fin_df.count(), "cols =", len(fin_df.columns))
fin_df.select("snapshot_date").distinct().orderBy("snapshot_date").show(10)


Financial feature files: 24


                                                                                

rows = 11974 cols = 41


[Stage 34:>                                                       (0 + 24) / 24]

+-------------+
|snapshot_date|
+-------------+
|   2023-01-01|
|   2023-02-01|
|   2023-03-01|
|   2023-04-01|
|   2023-05-01|
|   2023-06-01|
|   2023-07-01|
|   2023-08-01|
|   2023-09-01|
|   2023-10-01|
+-------------+
only showing top 10 rows



                                                                                

In [8]:
# Label store keys
lbl.select("customer_id", "snapshot_date").printSchema()
lbl.select("customer_id").show(5)
lbl.select("snapshot_date").distinct().orderBy("snapshot_date").show(5)

# Financial features keys
fin_df.select("customer_id", "snapshot_date").printSchema()
fin_df.select("customer_id").show(5)
fin_df.select("snapshot_date").distinct().orderBy("snapshot_date").show(5)


root
 |-- customer_id: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

+-----------+
|customer_id|
+-----------+
| CUS_0x1037|
| CUS_0x1069|
| CUS_0x114a|
| CUS_0x1184|
| CUS_0x1297|
+-----------+
only showing top 5 rows



                                                                                

+-------------+
|snapshot_date|
+-------------+
|   2023-07-01|
|   2023-08-01|
|   2023-09-01|
|   2023-10-01|
|   2023-11-01|
+-------------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

+-----------+
|customer_id|
+-----------+
| CUS_0x10ac|
| CUS_0x10c5|
| CUS_0x1145|
| CUS_0x11ac|
| CUS_0x122c|
+-----------+
only showing top 5 rows



[Stage 42:>                                                       (0 + 24) / 24]

+-------------+
|snapshot_date|
+-------------+
|   2023-01-01|
|   2023-02-01|
|   2023-03-01|
|   2023-04-01|
|   2023-05-01|
+-------------+
only showing top 5 rows



                                                                                

In [9]:
from pyspark.sql import functions as F

overlap_now = lbl.join(fin_df, ["customer_id","snapshot_date"], "inner").count()
print("Exact-key overlaps (raw):", overlap_now)

[Stage 46:>                                                       (0 + 24) / 24]

Exact-key overlaps (raw): 0


                                                                                

In [10]:
lbl_n  = (lbl
          .withColumn("cid_norm",  F.upper(F.trim(F.col("customer_id"))))
          .withColumn("sdate_norm", F.to_date("snapshot_date")))

fin_n  = (fin_df
          .withColumn("cid_norm",  F.upper(F.trim(F.col("customer_id"))))
          .withColumn("sdate_norm", F.to_date("snapshot_date")))

overlap_norm = (lbl_n
                .join(fin_n, ["cid_norm","sdate_norm"], "inner")
                .count())
print("Overlaps after TRIM+UPPER:", overlap_norm)


[Stage 50:>                                                       (0 + 24) / 24]

Overlaps after TRIM+UPPER: 0


                                                                                

In [13]:
from pyspark.sql import functions as F

# --- Read raw CSVs (correct file names & safe quote syntax) ---
lms_raw = spark.read.option("header", True).csv("data/lms_loan_daily.csv")

fin_raw = (
    spark.read.option("header", True)
    .option("multiLine", True)
    .option("quote", '"')
    .option("escape", '"')
    .csv("data/features_financials.csv")
)

att_raw = (
    spark.read.option("header", True)
    .option("multiLine", True)
    .option("quote", '"')
    .option("escape", '"')
    .csv("data/features_attributes.csv")
)

print("RAW columns:")
print("LMS :", lms_raw.columns[:20])
print("FIN :", fin_raw.columns[:20])
print("ATTR:", att_raw.columns[:20])


RAW columns:
LMS : ['loan_id', 'Customer_ID', 'loan_start_date', 'tenure', 'installment_num', 'loan_amt', 'due_amt', 'paid_amt', 'overdue_amt', 'balance', 'snapshot_date']
FIN : ['Customer_ID', 'Annual_Income', 'Monthly_Inhand_Salary', 'Num_Bank_Accounts', 'Num_Credit_Card', 'Interest_Rate', 'Num_of_Loan', 'Type_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 'Changed_Credit_Limit', 'Num_Credit_Inquiries', 'Credit_Mix', 'Outstanding_Debt', 'Credit_Utilization_Ratio', 'Credit_History_Age', 'Payment_of_Min_Amount', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Payment_Behaviour']
ATTR: ['Customer_ID', 'Name', 'Age', 'SSN', 'Occupation', 'snapshot_date']


In [14]:
def pick_id(df):
    for c in ["Customer_ID","customer_id","CUSTOMER_ID","cust_id","id","ID"]:
        if c in df.columns:
            return c
    raise ValueError("No obvious ID column found.")

lms_id = pick_id(lms_raw)
fin_id = pick_id(fin_raw)
att_id = pick_id(att_raw)

def with_norm_id(df, col):
    return (df
            .withColumn("cid_norm", F.upper(F.trim(F.col(col))))
            .withColumn("snapshot_norm",
                        F.coalesce(
                            F.to_date("snapshot_date","d/M/yy"),
                            F.to_date("snapshot_date","dd/MM/yy"),
                            F.to_date("snapshot_date","d/M/yyyy"),
                            F.to_date("snapshot_date","dd/MM/yyyy"),
                            F.to_date("snapshot_date","M/d/yyyy"),
                            F.to_date("snapshot_date","yyyy-MM-dd"),
                            F.to_date("snapshot_date")
                        )))

lmsN = with_norm_id(lms_raw, lms_id)
finN = with_norm_id(fin_raw, fin_id)
attN = with_norm_id(att_raw, att_id)

print("Detected IDs:", {"lms": lms_id, "fin": fin_id, "att": att_id})


Detected IDs: {'lms': 'Customer_ID', 'fin': 'Customer_ID', 'att': 'Customer_ID'}


In [15]:
# Overall distinct ID counts
print("Distinct IDs → LMS/FIN/ATTR:",
      lmsN.select("cid_norm").distinct().count(),
      finN.select("cid_norm").distinct().count(),
      attN.select("cid_norm").distinct().count())

# Overall intersections
print("LMS ∩ FIN :", lmsN.select("cid_norm").distinct()
                      .join(finN.select("cid_norm").distinct(), "cid_norm", "inner").count())
print("LMS ∩ ATTR:", lmsN.select("cid_norm").distinct()
                      .join(attN.select("cid_norm").distinct(), "cid_norm", "inner").count())
print("FIN ∩ ATTR:", finN.select("cid_norm").distinct()
                      .join(attN.select("cid_norm").distinct(), "cid_norm", "inner").count())

# Restrict to July 2023 (matches how the pipeline filters)
lms_jul = lmsN.where(F.date_format("snapshot_norm","yyyy-MM")=="2023-07").select("cid_norm").distinct()
fin_jul = finN.where(F.date_format("snapshot_norm","yyyy-MM")=="2023-07").select("cid_norm").distinct()
att_jul = attN.where(F.date_format("snapshot_norm","yyyy-MM")=="2023-07").select("cid_norm").distinct()

print("July-2023 → counts LMS/FIN/ATTR:", lms_jul.count(), fin_jul.count(), att_jul.count())
print("July-2023 LMS ∩ FIN :", lms_jul.join(fin_jul,"cid_norm","inner").count())
print("July-2023 LMS ∩ ATTR:", lms_jul.join(att_jul,"cid_norm","inner").count())
print("July-2023 FIN ∩ ATTR:", fin_jul.join(att_jul,"cid_norm","inner").count())


Distinct IDs → LMS/FIN/ATTR: 12500 12500 12500
LMS ∩ FIN : 12500
LMS ∩ ATTR: 12500
FIN ∩ ATTR: 12500


                                                                                

July-2023 → counts LMS/FIN/ATTR: 3556 471 471
July-2023 LMS ∩ FIN : 471
July-2023 LMS ∩ ATTR: 471
July-2023 FIN ∩ ATTR: 471


In [16]:
# Distinct IDs in Gold label store vs Gold financial features
lbl_ids  = lbl.select(F.upper(F.trim("customer_id")).alias("cid")).distinct()
fin_ids  = fin_df.select(F.upper(F.trim("customer_id")).alias("cid")).distinct()

print("Gold-only ID overlap:", lbl_ids.join(fin_ids, "cid", "inner").count())


                                                                                

Gold-only ID overlap: 8974


In [17]:
from pyspark.sql import functions as F
from datetime import datetime
from pathlib import Path

LABEL_DIR = "datamart/gold/label_store"
FIN_DIR   = "datamart/gold/features/financials"
ATT_DIR   = "datamart/gold/features/attributes"
OUT_DIR   = "datamart/gold/model_dataset"   # overwrite in-place

def token(ds: str) -> str:
    return datetime.strptime(ds, "%Y-%m-%d").strftime("%Y_%m_%d")

def rebuild_one_month(ds: str):
    tok = token(ds)
    p_lbl = f"{LABEL_DIR}/gold_label_store_{tok}.parquet"
    p_fin = f"{FIN_DIR}/gold_financials_features_{tok}.parquet"
    p_att = f"{ATT_DIR}/gold_attributes_features_{tok}.parquet"

    # Read
    lbl = (spark.read.parquet(p_lbl)
           .withColumnRenamed("Customer_ID","customer_id")
           .select("customer_id","snapshot_date","loan_id","label","label_def"))
    fin = (spark.read.parquet(p_fin)
           .withColumnRenamed("Customer_ID","customer_id"))
    att = (spark.read.parquet(p_att)
           .withColumnRenamed("Customer_ID","customer_id")
           .drop("snap_ym"))

    # Normalize join keys: TRIM+UPPER for ID and MONTH key for dates
    def norm(d):
        return (d
                .withColumn("cid_norm", F.upper(F.trim("customer_id")))
                .withColumn("ym_norm", F.date_format(F.to_date("snapshot_date"), "yyyy-MM")))

    lbln, finn, attn = norm(lbl), norm(fin), norm(att)

    # Join by (customer_id, month) via the normalized columns
    ds_joined = (lbln
                 .join(finn.drop("customer_id","snapshot_date"), ["cid_norm","ym_norm"], "left")
                 .join(attn.drop("customer_id","snapshot_date"), ["cid_norm","ym_norm"], "left"))

    # Restore clean columns: keep label’s first-of-month snapshot_date
    out_cols_front = ["customer_id","snapshot_date","loan_id","label","label_def"]
    ds_out = (ds_joined
              .withColumn("customer_id", F.col("cid_norm"))   # already UPPER+TRIM
              .drop("cid_norm","ym_norm"))

    # Reorder: label keys first, then the rest
    rest = [c for c in ds_out.columns if c not in out_cols_front]
    ds_out = ds_out.select(*out_cols_front, *rest)

    out_path = f"{OUT_DIR}/gold_model_dataset_{tok}.parquet"
    ds_out.write.mode("overwrite").parquet(out_path)
    print("[REBUILT]", out_path)

# Rebuild for all months present in label store
months = [r["snapshot_date"].strftime("%Y-%m-%d")
          for r in spark.read.parquet(f"{LABEL_DIR}/gold_label_store_*.parquet")
                            .select("snapshot_date").distinct()
                            .orderBy("snapshot_date").collect()]

for m in months:
    rebuild_one_month(m)


                                                                                

[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_07_01.parquet


                                                                                

[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_08_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_09_01.parquet


                                                                                

[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_10_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_11_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_12_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_01_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_02_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_03_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_04_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_05_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_06_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_07_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_08_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_09_01.parquet
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_10_01.parquet
[REBUILT] da

In [18]:
md = spark.read.parquet("datamart/gold/model_dataset/gold_model_dataset_*.parquet")

# Check a few key features’ non-null rates
for c in ["dti","emi_to_income","credit_utilization_ratio","pay_behav_coarse","util_bucket"]:
    if c in md.columns:
        md.agg((1 - F.avg(F.col(c).isNull().cast("double"))).alias(f"{c}_non_null_rate")).show()

# Month counts (should match your label counts)
md.groupBy("snapshot_date").count().orderBy("snapshot_date").show(30, False)


                                                                                

+-----------------+
|dti_non_null_rate|
+-----------------+
|              0.0|
+-----------------+



                                                                                

+---------------------------+
|emi_to_income_non_null_rate|
+---------------------------+
|                        0.0|
+---------------------------+



                                                                                

+--------------------------------------+
|credit_utilization_ratio_non_null_rate|
+--------------------------------------+
|                                   0.0|
+--------------------------------------+



                                                                                

+------------------------------+
|pay_behav_coarse_non_null_rate|
+------------------------------+
|                           0.0|
+------------------------------+



                                                                                

+-------------------------+
|util_bucket_non_null_rate|
+-------------------------+
|                      0.0|
+-------------------------+



[Stage 289:>                                                      (0 + 24) / 24]

+-------------+-----+
|snapshot_date|count|
+-------------+-----+
|2023-07-01   |530  |
|2023-08-01   |501  |
|2023-09-01   |506  |
|2023-10-01   |510  |
|2023-11-01   |521  |
|2023-12-01   |517  |
|2024-01-01   |471  |
|2024-02-01   |481  |
|2024-03-01   |454  |
|2024-04-01   |487  |
|2024-05-01   |491  |
|2024-06-01   |489  |
|2024-07-01   |485  |
|2024-08-01   |518  |
|2024-09-01   |511  |
|2024-10-01   |513  |
|2024-11-01   |491  |
|2024-12-01   |498  |
+-------------+-----+



                                                                                

In [19]:
from pyspark.sql import functions as F

# A) Are the financial feature columns actually populated in GOLD (before any join)?
for c in ["dti","emi_to_income","credit_utilization_ratio","pay_behav_coarse","util_bucket"]:
    if c in fin_df.columns:
        fin_df.agg((1 - F.avg(F.col(c).isNull().cast("double"))).alias(f"{c}_non_null_in_fin")).show()

# B) Is ym_norm becoming NULL inside GOLD features? (if yes, no join can match)
fin_ym = (fin_df
          .withColumn("ym_try",
                      F.date_format(F.to_date("snapshot_date"), "yyyy-MM"))
          .withColumn("ym_from_snap",
                      F.when(F.length("snap_ym")==7, F.col("snap_ym"))
                       .otherwise(F.date_format(F.to_timestamp("snap_ym"), "yyyy-MM")))
          .withColumn("ym_norm_probe", F.coalesce("ym_try","ym_from_snap")))
fin_ym.agg(
    F.sum(F.col("ym_try").isNull().cast("int")).alias("ym_try_nulls"),
    F.sum(F.col("ym_from_snap").isNull().cast("int")).alias("ym_from_snap_nulls"),
    F.sum(F.col("ym_norm_probe").isNull().cast("int")).alias("ym_norm_probe_nulls")
).show()


                                                                                

+-------------------+
|dti_non_null_in_fin|
+-------------------+
|                1.0|
+-------------------+



                                                                                

+-----------------------------+
|emi_to_income_non_null_in_fin|
+-----------------------------+
|                          1.0|
+-----------------------------+



                                                                                

+----------------------------------------+
|credit_utilization_ratio_non_null_in_fin|
+----------------------------------------+
|                                     1.0|
+----------------------------------------+



                                                                                

+--------------------------------+
|pay_behav_coarse_non_null_in_fin|
+--------------------------------+
|                             1.0|
+--------------------------------+



                                                                                

+---------------------------+
|util_bucket_non_null_in_fin|
+---------------------------+
|                        1.0|
+---------------------------+



[Stage 307:>                                                      (0 + 24) / 24]

+------------+------------------+-------------------+
|ym_try_nulls|ym_from_snap_nulls|ym_norm_probe_nulls|
+------------+------------------+-------------------+
|           0|                 0|                  0|
+------------+------------------+-------------------+



                                                                                

In [21]:
from pyspark.sql import functions as F
from datetime import datetime

LABEL_DIR = "datamart/gold/label_store"
FIN_DIR   = "datamart/gold/features/financials"
ATT_DIR   = "datamart/gold/features/attributes"
OUT_DIR   = "datamart/gold/model_dataset"

def token(ds: str) -> str:
    return datetime.strptime(ds, "%Y-%m-%d").strftime("%Y_%m_%d")

def normalize_id(df):
    if "Customer_ID" in df.columns:
        df = df.withColumnRenamed("Customer_ID", "customer_id")
    return df.withColumn("customer_id", F.upper(F.trim(F.col("customer_id"))))

def add_ym(df):
    # Always try from snapshot_date → 'yyyy-MM'
    ym_try = F.date_format(F.to_date("snapshot_date"), "yyyy-MM")
    # Only fall back to snap_ym if the column exists in this DF
    if "snap_ym" in df.columns:
        ym_from_snap = F.when(F.length("snap_ym")==7, F.col("snap_ym")) \
                        .otherwise(F.date_format(F.to_timestamp("snap_ym"), "yyyy-MM"))
        return df.withColumn("ym_norm", F.coalesce(ym_try, ym_from_snap))
    else:
        return df.withColumn("ym_norm", ym_try)

def rebuild_one_month(ds: str):
    tok = token(ds)
    p_lbl = f"{LABEL_DIR}/gold_label_store_{tok}.parquet"
    p_fin = f"{FIN_DIR}/gold_financials_features_{tok}.parquet"
    p_att = f"{ATT_DIR}/gold_attributes_features_{tok}.parquet"
    p_out = f"{OUT_DIR}/gold_model_dataset_{tok}.parquet"

    lbl_raw = spark.read.parquet(p_lbl)
    fin_raw = spark.read.parquet(p_fin)
    att_raw = spark.read.parquet(p_att)

    lbl = add_ym(normalize_id(lbl_raw)) \
            .select("customer_id","snapshot_date","ym_norm","loan_id","label","label_def")
    fin = add_ym(normalize_id(fin_raw))
    att = add_ym(normalize_id(att_raw)).drop("snap_ym")

    # diagnostics: check monthly key overlap before join
    lbl_keys = lbl.select("customer_id","ym_norm").distinct()
    fin_keys = fin.select("customer_id","ym_norm").distinct()
    att_keys = att.select("customer_id","ym_norm").distinct()
    o_lbl_fin = lbl_keys.join(fin_keys, ["customer_id","ym_norm"], "inner").count()
    o_lbl_att = lbl_keys.join(att_keys, ["customer_id","ym_norm"], "inner").count()
    print(f"[{ds}] overlap keys  label↔fin = {o_lbl_fin:,}   label↔att = {o_lbl_att:,}")

    # left-join on normalized keys
    ds_join = (lbl
               .join(fin.drop("snapshot_date"), ["customer_id","ym_norm"], "left")
               .join(att.drop("snapshot_date"), ["customer_id","ym_norm"], "left"))

    # keep label's first-of-month snapshot_date
    front = ["customer_id","snapshot_date","loan_id","label","label_def"]
    rest  = [c for c in ds_join.columns if c not in front + ["ym_norm"]]
    out   = ds_join.select(*front, *rest)

    out.write.mode("overwrite").parquet(p_out)
    print("[REBUILT]", p_out)

# Rebuild all months
months = [r["snapshot_date"].strftime("%Y-%m-%d")
          for r in spark.read.parquet(f"{LABEL_DIR}/gold_label_store_*.parquet")
                             .select("snapshot_date").distinct()
                             .orderBy("snapshot_date").collect()]
for m in months:
    rebuild_one_month(m)


[2023-07-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_07_01.parquet
[2023-08-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_08_01.parquet
[2023-09-01] overlap keys  label↔fin = 0   label↔att = 0


                                                                                

[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_09_01.parquet
[2023-10-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_10_01.parquet
[2023-11-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_11_01.parquet
[2023-12-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2023_12_01.parquet
[2024-01-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_01_01.parquet
[2024-02-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_02_01.parquet
[2024-03-01] overlap keys  label↔fin = 0   label↔att = 0


                                                                                

[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_03_01.parquet
[2024-04-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_04_01.parquet
[2024-05-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_05_01.parquet
[2024-06-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_06_01.parquet
[2024-07-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_07_01.parquet
[2024-08-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_08_01.parquet
[2024-09-01] overlap keys  label↔fin = 0   label↔att = 0


                                                                                

[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_09_01.parquet
[2024-10-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_10_01.parquet
[2024-11-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_11_01.parquet
[2024-12-01] overlap keys  label↔fin = 0   label↔att = 0
[REBUILT] datamart/gold/model_dataset/gold_model_dataset_2024_12_01.parquet


In [22]:
from pyspark.sql import functions as F
from datetime import datetime

def token(ds: str) -> str:
    return datetime.strptime(ds, "%Y-%m-%d").strftime("%Y_%m_%d")

ds = "2023-07-01"
tok = token(ds)

p_lbl = f"datamart/gold/label_store/gold_label_store_{tok}.parquet"
p_fin = f"datamart/gold/features/financials/gold_financials_features_{tok}.parquet"
p_att = f"datamart/gold/features/attributes/gold_attributes_features_{tok}.parquet"

lbl = spark.read.parquet(p_lbl).withColumnRenamed("Customer_ID","customer_id")
fin = spark.read.parquet(p_fin).withColumnRenamed("Customer_ID","customer_id")
att = spark.read.parquet(p_att).withColumnRenamed("Customer_ID","customer_id")

# Normalize
lbl = lbl.withColumn("customer_id", F.upper(F.trim("customer_id")))\
         .withColumn("ym_norm", F.date_format(F.to_date("snapshot_date"), "yyyy-MM"))
fin = fin.withColumn("customer_id", F.upper(F.trim("customer_id")))\
         .withColumn("ym_try", F.date_format(F.to_date("snapshot_date"), "yyyy-MM"))\
         .withColumn("ym_snap", F.when(F.length("snap_ym")==7, F.col("snap_ym"))
                                  .otherwise(F.date_format(F.to_timestamp("snap_ym"), "yyyy-MM")))
att = att.withColumn("customer_id", F.upper(F.trim("customer_id")))\
         .withColumn("ym_try", F.date_format(F.to_date("snapshot_date"), "yyyy-MM"))\
         .withColumn("ym_snap", F.when(F.length("snap_ym")==7, F.col("snap_ym"))
                                  .otherwise(F.date_format(F.to_timestamp("snap_ym"), "yyyy-MM")))

print("Label ym_norm distinct:", lbl.select("ym_norm").distinct().orderBy("ym_norm").show())
print("Fin ym_try, ym_snap distinct:", fin.select("ym_try","ym_snap").distinct().orderBy("ym_try","ym_snap").show(10, False))
print("Att ym_try, ym_snap distinct:", att.select("ym_try","ym_snap").distinct().orderBy("ym_try","ym_snap").show(10, False))

# Show a few non-matching examples (anti-join)
lbl_keys = lbl.select("customer_id","ym_norm").distinct()
fin_keys = fin.select(F.col("customer_id"), F.coalesce("ym_try","ym_snap").alias("ym_norm")).distinct()

not_in_fin = lbl_keys.join(fin_keys, ["customer_id","ym_norm"], "left_anti")
print("Examples missing in FIN:")
not_in_fin.show(5, False)


+-------+
|ym_norm|
+-------+
|2023-07|
+-------+

Label ym_norm distinct: None
+-------+-------+
|ym_try |ym_snap|
+-------+-------+
|2023-07|2023-07|
+-------+-------+

Fin ym_try, ym_snap distinct: None


                                                                                

+------+-------+
|ym_try|ym_snap|
+------+-------+
|NULL  |2023-07|
+------+-------+

Att ym_try, ym_snap distinct: None
Examples missing in FIN:
+-----------+-------+
|customer_id|ym_norm|
+-----------+-------+
|CUS_0X53F4 |2023-07|
|CUS_0X69BE |2023-07|
|CUS_0X1037 |2023-07|
|CUS_0X42C6 |2023-07|
|CUS_0X9BEF |2023-07|
+-----------+-------+
only showing top 5 rows



In [23]:
from pyspark.sql import functions as F

def mk_keys(df, id_col="customer_id", date_col="snapshot_date", literal_ym=None):
    # unify ID column name
    if "Customer_ID" in df.columns and id_col != "Customer_ID":
        df = df.withColumnRenamed("Customer_ID", id_col)
    # normalize ID: trim, upper, remove all whitespace, then strip any non [A-Z0-9_]
    cid = F.upper(F.trim(F.col(id_col)))
    cid = F.regexp_replace(cid, r"\s+", "")                 # remove spaces/tabs/nbspaces
    cid = F.regexp_replace(cid, r"[^A-Z0-9_]", "")          # remove hidden punctuation, zero-widths, etc.

    # month key from literal (file month) OR from columns
    if literal_ym is not None:
        ym = F.lit(literal_ym)
    else:
        ym_try  = F.date_format(F.to_date(date_col), "yyyy-MM")
        ym_snap = F.when(F.col("snap_ym").isNotNull(),
                         F.when(F.length("snap_ym")==7, F.col("snap_ym"))
                          .otherwise(F.date_format(F.to_timestamp("snap_ym"), "yyyy-MM")))
        ym = F.coalesce(ym_try, ym_snap)

    return (df
            .withColumn("cid_key", cid)
            .withColumn("ym_key", ym))

# --- July 2023 only (Gold paths) ---
ds  = "2023-07-01"
tok = "2023_07_01"

lbl = spark.read.parquet(f"datamart/gold/label_store/gold_label_store_{tok}.parquet")
fin = spark.read.parquet(f"datamart/gold/features/financials/gold_financials_features_{tok}.parquet")
att = spark.read.parquet(f"datamart/gold/features/attributes/gold_attributes_features_{tok}.parquet")

lblK = mk_keys(lbl, id_col="customer_id", date_col="snapshot_date")
finK = mk_keys(fin, id_col="customer_id", date_col="snapshot_date")
attK = mk_keys(att, id_col="customer_id", date_col="snapshot_date")

# Inspect a few keys
print("Sample label IDs:", [r[0] for r in lblK.select("cid_key").distinct().limit(5).collect()])
print("Sample fin   IDs:", [r[0] for r in finK.select("cid_key").distinct().limit(5).collect()])

# Overlaps on aggressive keys (should be > 0)
ov_fin = (lblK.select("cid_key","ym_key").distinct()
               .join(finK.select("cid_key","ym_key").distinct(), ["cid_key","ym_key"], "inner").count())
ov_att = (lblK.select("cid_key","ym_key").distinct()
               .join(attK.select("cid_key","ym_key").distinct(), ["cid_key","ym_key"], "inner").count())
print("Aggressive overlap July → label↔fin:", ov_fin, " label↔att:", ov_att)

# If still 0, show a few anti-join samples side-by-side (first 10)
miss = (lblK.select("cid_key","ym_key").distinct()
             .join(finK.select("cid_key","ym_key").distinct(), ["cid_key","ym_key"], "left_anti"))
print("Examples missing in FIN after aggressive norm:")
miss.show(10, False)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `snap_ym` cannot be resolved. Did you mean one of the following? [`label`, `cid_key`, `loan_id`, `label_def`, `snapshot_date`].;
'Project [loan_id#28498, customer_id#28628, label#28500, label_def#28501, snapshot_date#28502, cid_key#28635, coalesce(date_format(cast(to_date(snapshot_date#28502, None, Some(Etc/UTC), false) as timestamp), yyyy-MM, Some(Etc/UTC)), CASE WHEN isnotnull('snap_ym) THEN CASE WHEN (length('snap_ym) = 7) THEN 'snap_ym ELSE date_format(to_timestamp('snap_ym, None, TimestampType, Some(Etc/UTC), false), yyyy-MM, Some(Etc/UTC)) END END) AS ym_key#28642]
+- Project [loan_id#28498, customer_id#28628, label#28500, label_def#28501, snapshot_date#28502, regexp_replace(regexp_replace(upper(trim(customer_id#28628, None)), \s+, , 1), [^A-Z0-9_], , 1) AS cid_key#28635]
   +- Project [loan_id#28498, Customer_ID#28499 AS customer_id#28628, label#28500, label_def#28501, snapshot_date#28502]
      +- Relation [loan_id#28498,Customer_ID#28499,label#28500,label_def#28501,snapshot_date#28502] parquet


In [24]:
from pyspark.sql import functions as F

def mk_keys(df, id_col="customer_id", date_col="snapshot_date", literal_ym=None):
    # unify ID column name
    if "Customer_ID" in df.columns and id_col != "Customer_ID":
        df = df.withColumnRenamed("Customer_ID", id_col)

    # aggressive ID normalization: trim, upper, remove whitespace & non [A-Z0-9_]
    cid = F.upper(F.trim(F.col(id_col)))
    cid = F.regexp_replace(cid, r"\s+", "")
    cid = F.regexp_replace(cid, r"[^A-Z0-9_]", "")

    # month key: prefer explicit literal (file month), else snapshot_date, else snap_ym (if present)
    if literal_ym is not None:
        ym = F.lit(literal_ym)  # e.g. '2023-07'
    else:
        ym_try = F.date_format(F.to_date(F.col(date_col)), "yyyy-MM")
        if "snap_ym" in df.columns:
            ym_snap = F.when(F.length("snap_ym")==7, F.col("snap_ym")) \
                       .otherwise(F.date_format(F.to_timestamp("snap_ym"), "yyyy-MM"))
            ym = F.coalesce(ym_try, ym_snap)
        else:
            ym = ym_try

    return df.withColumn("cid_key", cid).withColumn("ym_key", ym)


In [25]:
ds  = "2023-07-01"
tok = "2023_07_01"
ym  = "2023-07"  # file month token

lbl = spark.read.parquet(f"datamart/gold/label_store/gold_label_store_{tok}.parquet")
fin = spark.read.parquet(f"datamart/gold/features/financials/gold_financials_features_{tok}.parquet")
att = spark.read.parquet(f"datamart/gold/features/attributes/gold_attributes_features_{tok}.parquet")

lblK = mk_keys(lbl, id_col="customer_id", date_col="snapshot_date", literal_ym=ym)
finK = mk_keys(fin, id_col="customer_id", date_col="snapshot_date", literal_ym=ym)
attK = mk_keys(att, id_col="customer_id", date_col="snapshot_date", literal_ym=ym)

# Overlaps on robust keys (should be > 0 if customers align)
ov_fin = (lblK.select("cid_key","ym_key").distinct()
               .join(finK.select("cid_key","ym_key").distinct(), ["cid_key","ym_key"], "inner").count())
ov_att = (lblK.select("cid_key","ym_key").distinct()
               .join(attK.select("cid_key","ym_key").distinct(), ["cid_key","ym_key"], "inner").count())
print("July robust overlap → label↔fin:", ov_fin, " label↔att:", ov_att)

# If still 0, show a few anti-join samples
miss = (lblK.select("cid_key","ym_key").distinct()
             .join(finK.select("cid_key","ym_key").distinct(), ["cid_key","ym_key"], "left_anti"))
miss.show(10, False)


July robust overlap → label↔fin: 0  label↔att: 0
+----------+-------+
|cid_key   |ym_key |
+----------+-------+
|CUS_0X53F4|2023-07|
|CUS_0X69BE|2023-07|
|CUS_0X1037|2023-07|
|CUS_0X42C6|2023-07|
|CUS_0X9BEF|2023-07|
|CUS_0X71A0|2023-07|
|CUS_0XA146|2023-07|
|CUS_0XA2C0|2023-07|
|CUS_0XC2E2|2023-07|
|CUS_0X3BD0|2023-07|
+----------+-------+
only showing top 10 rows



In [26]:
from pyspark.sql import functions as F

# Load all months at once
lbl_all = spark.read.parquet("datamart/gold/label_store/gold_label_store_*.parquet") \
    .withColumnRenamed("Customer_ID","customer_id")
fin_all = spark.read.parquet("datamart/gold/features/financials/gold_financials_features_*.parquet") \
    .withColumnRenamed("Customer_ID","customer_id")
att_all = spark.read.parquet("datamart/gold/features/attributes/gold_attributes_features_*.parquet") \
    .withColumnRenamed("Customer_ID","customer_id")

def cid_key(col):
    c = F.upper(F.trim(F.col(col)))
    c = F.regexp_replace(c, r"\s+", "")
    c = F.regexp_replace(c, r"[^A-Z0-9_]", "")
    return c

lbl_ids = lbl_all.select(cid_key("customer_id").alias("cid")).distinct()
fin_ids = fin_all.select(cid_key("customer_id").alias("cid")).distinct()
att_ids = att_all.select(cid_key("customer_id").alias("cid")).distinct()

print("Label↔Fin ANY-month overlap:", lbl_ids.join(fin_ids, "cid", "inner").count())
print("Label↔Att ANY-month overlap:", lbl_ids.join(att_ids, "cid", "inner").count())


                                                                                

Label↔Fin ANY-month overlap: 8974




Label↔Att ANY-month overlap: 8974


                                                                                

In [29]:
from pyspark.sql import functions as F, Window

# ---------- paths ----------
LBL = "datamart/gold/label_store/gold_label_store_*.parquet"
FIN = "datamart/gold/features/financials/gold_financials_features_*.parquet"
ATT = "datamart/gold/features/attributes/gold_attributes_features_*.parquet"
OUT = "datamart/gold/model_dataset_asof"

# ---------- load & normalize ----------
def cid_norm(c):
    c = F.upper(F.trim(F.col(c)))
    c = F.regexp_replace(c, r"\s+", "")
    c = F.regexp_replace(c, r"[^A-Z0-9_]", "")
    return c

lbl = (spark.read.parquet(LBL)
       .withColumnRenamed("Customer_ID","customer_id")
       .withColumn("customer_id", cid_norm("customer_id"))
       .withColumn("label_date", F.to_date("snapshot_date"))
       .select("customer_id","label_date","loan_id","label","label_def"))

fin = (spark.read.parquet(FIN)
       .withColumnRenamed("Customer_ID","customer_id")
       .withColumn("customer_id", cid_norm("customer_id"))
       .withColumn("feat_date",
                   F.coalesce(
                       F.to_date("snapshot_date"),
                       F.to_date(F.to_timestamp("snap_ym"))  # guard if only snap_ym exists
                   )))

att = (spark.read.parquet(ATT)
       .withColumnRenamed("Customer_ID","customer_id")
       .withColumn("customer_id", cid_norm("customer_id"))
       .withColumn("feat_date",
                   F.coalesce(
                       F.to_date("snapshot_date"),
                       F.to_date(F.to_timestamp("snap_ym"))
                   )))

# Keep only columns you actually want from fin/att to avoid duplicate names
fin_keep = [c for c in fin.columns if c not in {"snapshot_date","snap_ym","customer_id","feat_date"}]
att_keep = [c for c in att.columns if c not in {"snapshot_date","snap_ym","customer_id","feat_date"}]

# ---------- AS-OF join: latest feature <= label_date ----------
# 1) Join label with FIN on customer_id then filter to feat_date <= label_date
fin_join = (lbl.join(F.broadcast(fin.select("customer_id","feat_date", *fin_keep)), "customer_id", "left")
              .where(F.col("feat_date").isNotNull() & (F.col("feat_date") <= F.col("label_date"))))

w_fin = Window.partitionBy("customer_id","label_date").orderBy(F.col("feat_date").desc())
fin_ranked = fin_join.withColumn("rn", F.row_number().over(w_fin)).where(F.col("rn")==1).drop("rn")

# 2) Same for ATTR
att_join = (lbl.join(F.broadcast(att.select("customer_id","feat_date", *att_keep)), "customer_id", "left")
              .where(F.col("feat_date").isNotNull() & (F.col("feat_date") <= F.col("label_date"))))

w_att = Window.partitionBy("customer_id","label_date").orderBy(F.col("feat_date").desc())
att_ranked = att_join.withColumn("rn", F.row_number().over(w_att)).where(F.col("rn")==1).drop("rn")

# 3) Merge the two as-of results back onto *label* keys
# safer drop: exclude only columns that clash, but KEEP join keys
base_keys = ["customer_id", "label_date", "loan_id", "label", "label_def"]

fin_clean = fin_ranked.drop(*[c for c in base_keys if c not in ["customer_id", "label_date"]])
att_clean = att_ranked.drop(*[c for c in base_keys if c not in ["customer_id", "label_date"]])

md_asof = (
    lbl
    .join(fin_clean, ["customer_id", "label_date"], "left")
    .join(att_clean, ["customer_id", "label_date"], "left")
    .drop("feat_date")        # <--- remove duplicate helper columns
    .withColumnRenamed("label_date", "snapshot_date")
)

md_asof.write.mode("overwrite").parquet(OUT)
print("[ASOF] wrote:", OUT)

# ---------- quick sanity ----------
md = spark.read.parquet(OUT)
print("rows =", md.count(), "cols =", len(md.columns))
md.groupBy("snapshot_date").count().orderBy("snapshot_date").show(30, False)

for c in ["dti","emi_to_income","credit_utilization_ratio","pay_behav_coarse","util_bucket"]:
    if c in md.columns:
        md.agg((1 - F.avg(F.col(c).isNull().cast("double"))).alias(f"{c}_non_null_rate")).show()

                                                                                

[ASOF] wrote: datamart/gold/model_dataset_asof
rows = 8974 cols = 59
+-------------+-----+
|snapshot_date|count|
+-------------+-----+
|2023-07-01   |530  |
|2023-08-01   |501  |
|2023-09-01   |506  |
|2023-10-01   |510  |
|2023-11-01   |521  |
|2023-12-01   |517  |
|2024-01-01   |471  |
|2024-02-01   |481  |
|2024-03-01   |454  |
|2024-04-01   |487  |
|2024-05-01   |491  |
|2024-06-01   |489  |
|2024-07-01   |485  |
|2024-08-01   |518  |
|2024-09-01   |511  |
|2024-10-01   |513  |
|2024-11-01   |491  |
|2024-12-01   |498  |
+-------------+-----+

+-----------------+
|dti_non_null_rate|
+-----------------+
|              1.0|
+-----------------+

+---------------------------+
|emi_to_income_non_null_rate|
+---------------------------+
|                        1.0|
+---------------------------+

+--------------------------------------+
|credit_utilization_ratio_non_null_rate|
+--------------------------------------+
|                                   1.0|
+-----------------------------

In [30]:
from pyspark.sql import functions as F

# Read the model datasets built by main.py
md = spark.read.parquet("datamart/gold/model_dataset/gold_model_dataset_*.parquet")

print("rows =", md.count(), "cols =", len(md.columns))
md.groupBy("snapshot_date").count().orderBy("snapshot_date").show(30, False)

# Check non-null ratio for a few key engineered features
for c in ["dti","emi_to_income","credit_utilization_ratio","pay_behav_coarse","util_bucket"]:
    if c in md.columns:
        md.agg(
            (1 - F.avg(F.col(c).isNull().cast("double"))).alias(f"{c}_non_null_rate")
        ).show()

# Optional — find columns that still have any missing values
total = md.count()
nulls = (
    md.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in md.columns])
      .toPandas()
      .T.reset_index()
      .rename(columns={"index":"column", 0:"nulls"})
)
nulls["null_rate_%"] = (nulls["nulls"]/total*100).round(3)
nulls.sort_values("null_rate_%", ascending=False).head(15)


                                                                                

rows = 8974 cols = 59


                                                                                

+-------------+-----+
|snapshot_date|count|
+-------------+-----+
|2023-07-01   |530  |
|2023-08-01   |501  |
|2023-09-01   |506  |
|2023-10-01   |510  |
|2023-11-01   |521  |
|2023-12-01   |517  |
|2024-01-01   |471  |
|2024-02-01   |481  |
|2024-03-01   |454  |
|2024-04-01   |487  |
|2024-05-01   |491  |
|2024-06-01   |489  |
|2024-07-01   |485  |
|2024-08-01   |518  |
|2024-09-01   |511  |
|2024-10-01   |513  |
|2024-11-01   |491  |
|2024-12-01   |498  |
+-------------+-----+



                                                                                

+-----------------+
|dti_non_null_rate|
+-----------------+
|              1.0|
+-----------------+



                                                                                

+---------------------------+
|emi_to_income_non_null_rate|
+---------------------------+
|                        1.0|
+---------------------------+



                                                                                

+--------------------------------------+
|credit_utilization_ratio_non_null_rate|
+--------------------------------------+
|                                   1.0|
+--------------------------------------+



                                                                                

+------------------------------+
|pay_behav_coarse_non_null_rate|
+------------------------------+
|                           1.0|
+------------------------------+



                                                                                

+-------------------------+
|util_bucket_non_null_rate|
+-------------------------+
|                      1.0|
+-------------------------+



                                                                                

Unnamed: 0,column,nulls,null_rate_%
7,credit_mix,1896,21.128
26,minpay_flag,1069,11.912
14,invest_to_income,396,4.413
13,emi_to_balance,1,0.011
2,loan_id,0,0.0
5,interest_rate,0,0.0
4,label_def,0,0.0
6,num_credit_inquiries,0,0.0
3,label,0,0.0
1,snapshot_date,0,0.0
