In [None]:
import os
import psutil
import platform

def spark_hardware_report():
    """
    Ultra-safe + high-performance Spark hardware detector.
    This version FIXES the memory over-allocation problem.
    
    Rules:
    - Leave 30–40% RAM for OS / Jupyter (critical on shared nodes)
    - Use up to (cores - 4) for Spark
    - Cap shuffle partitions intelligently
    """
    
    # ---------------------------------------------------------
    # CPU CORES (most accurate method)
    # ---------------------------------------------------------
    try:
        n_cores = len(os.sched_getaffinity(0))    # respects cgroups
    except AttributeError:
        n_cores = os.cpu_count() or 1
    
    # Reserve ~4 cores for OS/Jupyter
    spark_cores = max(4, n_cores - 4)
    
    # ---------------------------------------------------------
    # MEMORY
    # ---------------------------------------------------------
    vm = psutil.virtual_memory()
    total_ram_gb = round(vm.total / (1024**3), 1)
    avail_ram_gb = round(vm.available / (1024**3), 1)
    
    # Hard rule: Spark may use 60% of total RAM safely
    safe_spark_gb = int(total_ram_gb * 0.60)
    
    # Driver-only memory (local mode uses 1 JVM)
    driver_memory_gb = safe_spark_gb
    
    # ---------------------------------------------------------
    # Shuffle partitions
    # ---------------------------------------------------------
    # Local mode: usually 3–5× cores is optimal
    recommended_parts = max(spark_cores * 40, 200)
    recommended_parts = min(recommended_parts, 1500)
    
    report = {
        "n_cores_total"      : n_cores,
        "spark_cores"        : spark_cores,
        "total_ram_gb"       : total_ram_gb,
        "available_ram_gb"   : avail_ram_gb,
        "driver_memory"      : f"{driver_memory_gb}g",
        "recommended_parts"  : recommended_parts,
        "system"             : platform.system(),
        "machine"            : platform.machine(),
    }

    # ---------------------------------------------------------
    # PRETTY PRINT
    # ---------------------------------------------------------
    print("="*70)
    print(" SAFE SPARK HARDWARE REPORT — ZERO-CRASH EDITION ")
    print("="*70)
    print(f"Detected CPU cores         : {n_cores}")
    print(f"Cores reserved for Spark   : {spark_cores}")
    print(f"Total RAM                  : {total_ram_gb} GB")
    print(f"Available RAM right now    : {avail_ram_gb} GB")
    print(f"Recommended driver memory  : {report['driver_memory']}")
    print(f"Recommended partitions     : {recommended_parts}")
    print(f"Memory left for OS         : {total_ram_gb - driver_memory_gb} GB")
    print("="*70)
    print("This config avoids JVM death and Spark disconnects.")
    print("="*70)
    
    return report

# CALL IT


 SAFE SPARK HARDWARE REPORT — ZERO-CRASH EDITION 
Detected CPU cores         : 24
Cores reserved for Spark   : 20
Total RAM                  : 376.4 GB
Available RAM right now    : 358.4 GB
Recommended driver memory  : 225g
Recommended partitions     : 800
Memory left for OS         : 151.39999999999998 GB
This config avoids JVM death and Spark disconnects.


In [None]:
from pyspark.sql import SparkSession

try:
    spark.stop()
    print("Old Spark session stopped.")
except Exception:
    print("No active Spark session — starting fresh.")

from pyspark.sql import SparkSession

from pyspark.sql import SparkSession

# Hard-coded safe settings for your SLURM job: 128 GB, 24 cores
SPARK_CORES = 12          
DRIVER_MEM  = "64g"      
SHUFFLE_PARTS = 320      

try:
    spark.stop()
except:
    pass

spark = (
    SparkSession.builder
        .appName("amex-spark")
        .master(f"local[{SPARK_CORES}]")
        .config("spark.driver.memory", DRIVER_MEM)
        .config("spark.sql.shuffle.partitions", str(SHUFFLE_PARTS))
        .config("spark.driver.maxResultSize", "4g")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()
)

spark



Old Spark session stopped.


25/11/25 22:12:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Loading Data

In [3]:
ROOT = "/storage/group/cxs1024/default/mehdi/data/amex-raddar-parquet"
clean_path = ROOT + "/data_clean.parquet"

print("Reading cleaned parquet...")
df = spark.read.parquet(clean_path)

print("Data loaded. Schema:")
df.printSchema()

# --- ONE-TIME OPTIMAL REPARTITION & CACHE ---
print(f"Repartitioning to {SHUFFLE_PARTS} partitions and caching...")
df = df.repartition(SHUFFLE_PARTS)
df = df.cache()

print("Triggering cache ...")
row_count = df.count()     # ← ONLY ONE COUNT (forces full materialization)

print("\n" + "="*80)
print("DATA FULLY LOADED & CACHED — LIGHTNING READY!")
print(f"Rows                : {row_count:,}")
print(f"Columns             : {len(df.columns)}")
print(f"Partitions          : {df.rdd.getNumPartitions()}")
print(f"Rows per partition  : ~{row_count // df.rdd.getNumPartitions():,}")
print("="*80)


Reading cleaned parquet...


                                                                                

Data loaded. Schema:
root
 |-- customer_ID: string (nullable = true)
 |-- date: timestamp_ntz (nullable = true)
 |-- P_2: float (nullable = true)
 |-- D_39: float (nullable = true)
 |-- B_1: float (nullable = true)
 |-- B_2: float (nullable = true)
 |-- R_1: float (nullable = true)
 |-- S_3: float (nullable = true)
 |-- D_41: float (nullable = true)
 |-- B_3: float (nullable = true)
 |-- D_42: float (nullable = true)
 |-- D_43: float (nullable = true)
 |-- D_44: float (nullable = true)
 |-- B_4: float (nullable = true)
 |-- D_45: float (nullable = true)
 |-- B_5: float (nullable = true)
 |-- R_2: float (nullable = true)
 |-- D_46: float (nullable = true)
 |-- D_47: float (nullable = true)
 |-- D_48: float (nullable = true)
 |-- D_49: float (nullable = true)
 |-- B_6: float (nullable = true)
 |-- B_7: float (nullable = true)
 |-- B_8: float (nullable = true)
 |-- D_50: float (nullable = true)
 |-- D_51: float (nullable = true)
 |-- B_9: float (nullable = true)
 |-- R_3: float (nullable 

25/11/25 22:13:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Triggering cache ...





DATA FULLY LOADED & CACHED — LIGHTNING READY!
Rows                : 16,895,213
Columns             : 192
Partitions          : 320
Rows per partition  : ~52,797


                                                                                

# Null / NaN Profiling

In [4]:
from pyspark.sql import functions as F   

# --- Define categorical columns ---
categorical_cols = [
    'B_30','B_38','D_114','D_116','D_117','D_120','D_126','D_63','D_64','D_66','D_68'
]

# --- Identify numeric columns automatically ---
all_cols = df.columns
id_cols = ['customer_ID', 'date', 'test']   # exclude these from feature/nan checks
label_col = 'target'

numeric_cols = [
    c for c in all_cols
    if c not in categorical_cols + id_cols + [label_col]
]

print("Numeric columns:", len(numeric_cols))
print("Categorical columns:", len(categorical_cols))


# ---------------------------
# 1. Null + NaN count per column
# ---------------------------

# Build Spark expressions for each column
exprs = []
for c in df.columns:
    if c in categorical_cols or c in id_cols:
        # Categorical / ID → only NULL counts
        exprs.append(F.sum(F.col(c).isNull().cast("int")).alias(c))
    elif c == label_col:
        # Label may have nulls too
        exprs.append(F.sum(F.col(c).isNull().cast("int")).alias(c))
    else:
        # Numeric → NULL + NaN
        exprs.append(
            F.sum(
                F.col(c).isNull().cast("int")
                + F.isnan(F.col(c).cast("double")).cast("int")
            ).alias(c)
        )

null_counts = df.select(exprs)
null_counts.show(truncate=False)


# ---------------------------
# 2. Null percentage per column
# ---------------------------

row_count = df.count()

null_pct = null_counts.select([
    (F.col(c) / row_count * 100).alias(c) for c in null_counts.columns
])

print("Null/NaN percentages per column:")
null_pct.show(truncate=False)


Numeric columns: 177
Categorical columns: 11


                                                                                

+-----------+----+------+----+---+----+---+-------+----+----+--------+-------+------+---+----+---+---+-------+----+-------+--------+----+---+------+-------+----+---+---+-----+------+----+--------+---+----+---+----+---+-------+----+---+------+-------+------+---+----+-------+----+------+----+-------+-----+----+-------+----+------+----+----+-------+----+----+--------+----+------+----+---+----+----+------+----+------+----+-----+----+----+--------+---+-----+----+--------+----+---+-------+-----+----+------+------+---+--------+----+-----+----+----+----+-----+--------+-----+----+----+----+------+----+----+-----+----+--------+----+----+----+--------+----+----+--------+----+----+----+----+----+----+----+----+-----+----+----+------+----+----+----+----+----+----+-----+----+-----+-----+----+-----+------+------+-------+--------+------+----+----+--------+------+----+--------+-----+--------+--------+--------+-----+----+-------+------+------+------+------+------+------+------+------+------+------+-----



+-----------+----+------------------+----+---+------------------+---+------------------+------------------+------------------+-----------------+-----------------+-----------------+---+--------------------+---+---+------------------+----+------------------+-----------------+--------------------+---+------------------+-----------------+----+---+---+------------------+-----------------+----+-----------------+---+----+---+------------------+---+------------------+----+---+---------------+------------------+------------------+---+----+-----------------+----+------------------+----+------------------+-------------------+----+----------------+----+------------------+----+------------------+------------------+----+------------------+-----------------+------------------+------------------+--------------------+---+----+----+------------------+------------------+-----------------+----+------------------+----+----+-----------------+---+-------------------+----+-----------------+----+--------------

                                                                                

In [5]:
from functools import reduce

feature_cols = [
    c for c in df.columns
    if c not in id_cols + [label_col]
]

# Separate numeric vs categorical (reuse from Cell 3 if already defined)
numeric_cols = [
    c for c in feature_cols
    if c not in categorical_cols
]

print("Total feature columns:", len(feature_cols))
print("Numeric feature columns:", len(numeric_cols))
print("Categorical feature columns:", len(categorical_cols))


Total feature columns: 188
Numeric feature columns: 177
Categorical feature columns: 11


# Data Cleaning

In [7]:
from functools import reduce
from pyspark.sql import functions as F
import time

print("Starting data cleaning...")

# --- Define column groups ---
id_cols      = ['customer_ID', 'date', 'test']
label_col    = 'target'
categorical_cols = ['B_30','B_38','D_114','D_116','D_117',
                    'D_120','D_126','D_63','D_64','D_66','D_68']

# Filter to existing categorical columns
categorical_cols = [c for c in categorical_cols if c in df.columns]

# Auto-detect numeric columns
numeric_cols = [
    c for c, t in df.dtypes
    if c not in id_cols + [label_col] + categorical_cols
    and t in ('int', 'bigint', 'float', 'double', 'smallint', 'tinyint')
]

feature_cols = numeric_cols + categorical_cols
print(f"Total feature columns: {len(feature_cols)} (numeric: {len(numeric_cols)}, cat: {len(categorical_cols)})")

# --------------------------------------------------------------
# 1) Drop rows where ALL feature columns are null/NaN
# --------------------------------------------------------------

print("Building condition to drop rows with ALL features missing...")

conditions = (
    [F.col(c).isNull() | F.isnan(c) for c in numeric_cols] +
    [F.col(c).isNull() for c in categorical_cols]
)

if conditions:
    all_null_cond = reduce(lambda x, y: x & y, conditions)
    df_no_allnull = df.filter(~all_null_cond)
else:
    df_no_allnull = df  # no features → keep all

rows_before = df.count()
rows_after  = df_no_allnull.count()
print(f"Rows before: {rows_before:,}")
print(f"Rows after dropping all-null-feature rows: {rows_after:,}")

# --------------------------------------------------------------
# 2) Drop columns with >95% missing
# --------------------------------------------------------------

missing_threshold = 0.95
row_count = rows_after

print(f"Computing missing rates for {len(feature_cols)} feature columns...")

null_exprs = []
for c in feature_cols:
    if c in numeric_cols:
        null_exprs.append(
            F.sum((F.col(c).isNull() | F.isnan(c)).cast("int")).alias(c)
        )
    else:
        null_exprs.append(F.sum(F.col(c).isNull().cast("int")).alias(c))

missing_counts = df_no_allnull.select(null_exprs).first().asDict()
drop_cols = [
    c for c in feature_cols
    if missing_counts.get(c, 0) / row_count > missing_threshold
]

print(f"Columns to drop (> {missing_threshold*100:.0f}% missing): {len(drop_cols)}")
print(drop_cols)

df_clean = df_no_allnull.drop(*drop_cols)

print(f"Final number of columns: {len(df_clean.columns)}")
df_clean.printSchema()

# --------------------------------------------------------------
# 3) REPARTITION + CACHE df_clean
# --------------------------------------------------------------

print("Optimizing df_clean for maximum performance...")
df_clean = df_clean.repartition(SHUFFLE_PARTS)   
df_clean = df_clean.cache()

print("Triggering cache on df_clean ...")
start_cache = time.time()
cached_rows = df_clean.count()
print(f"Cache ready in {time.time() - start_cache:.1f}s")

print("\n" + "="*80)
print(f"Rows       : {cached_rows:,}")
print(f"Columns    : {len(df_clean.columns)}")
print(f"Partitions : {df_clean.rdd.getNumPartitions()}")
print("="*80)


Starting data cleaning...
Total feature columns: 188 (numeric: 177, cat: 11)
Building condition to drop rows with ALL features missing...


25/11/25 22:16:25 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
                                                                                

Rows before: 16,895,213
Rows after dropping all-null-feature rows: 16,895,213
Computing missing rates for 188 feature columns...


25/11/25 22:16:32 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
                                                                                

Columns to drop (> 95% missing): 13
['D_73', 'D_87', 'D_88', 'D_108', 'D_110', 'D_111', 'B_39', 'B_42', 'D_134', 'D_135', 'D_136', 'D_137', 'D_138']
Final number of columns: 179
root
 |-- customer_ID: string (nullable = true)
 |-- date: timestamp_ntz (nullable = true)
 |-- P_2: float (nullable = true)
 |-- D_39: float (nullable = true)
 |-- B_1: float (nullable = true)
 |-- B_2: float (nullable = true)
 |-- R_1: float (nullable = true)
 |-- S_3: float (nullable = true)
 |-- D_41: float (nullable = true)
 |-- B_3: float (nullable = true)
 |-- D_42: float (nullable = true)
 |-- D_43: float (nullable = true)
 |-- D_44: float (nullable = true)
 |-- B_4: float (nullable = true)
 |-- D_45: float (nullable = true)
 |-- B_5: float (nullable = true)
 |-- R_2: float (nullable = true)
 |-- D_46: float (nullable = true)
 |-- D_47: float (nullable = true)
 |-- D_48: float (nullable = true)
 |-- D_49: float (nullable = true)
 |-- B_6: float (nullable = true)
 |-- B_7: float (nullable = true)
 |-- B_

25/11/25 22:16:45 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.


Triggering cache on df_clean (30–60 sec first time)...




Cache ready in 28.5s

Rows       : 16,895,213
Columns    : 179
Partitions : 320


                                                                                

In [None]:
# from functools import reduce
# from pyspark.sql import functions as F
# import time

# # --------------------------------------------------------------
# # CLEANING PIPELINE — TOP-1 AMEX STYLE (FAST + CORRECT)
# # --------------------------------------------------------------

# print("Starting data cleaning...")

# # --- Define column groups (must be defined BEFORE using them!) ---
# id_cols      = ['customer_ID', 'date', 'test']
# label_col    = 'target'
# categorical_cols = ['B_30','B_38','D_114','D_116','D_117',
#                     'D_120','D_126','D_63','D_64','D_66','D_68']

# # Filter to existing categorical columns
# categorical_cols = [c for c in categorical_cols if c in df.columns]

# # Auto-detect numeric columns
# numeric_cols = [
#     c for c, t in df.dtypes
#     if c not in id_cols + [label_col] + categorical_cols
#     and t in ('int', 'bigint', 'float', 'double', 'smallint', 'tinyint')
# ]

# feature_cols = numeric_cols + categorical_cols
# print(f"Total feature columns: {len(feature_cols)} (numeric: {len(numeric_cols)}, cat: {len(categorical_cols)})")

# # --------------------------------------------------------------
# # 1) Drop rows where ALL feature columns are null/NaN
# # --------------------------------------------------------------

# print("Building condition to drop rows with ALL features missing...")

# # For numeric: null OR NaN
# # For categorical: only null
# conditions = (
#     [F.col(c).isNull() | F.isnan(c) for c in numeric_cols] +
#     [F.col(c).isNull() for c in categorical_cols]
# )

# # Combine with AND
# if conditions:
#     all_null_cond = reduce(lambda x, y: x & y, conditions)
#     df_no_allnull = df.filter(~all_null_cond)
# else:
#     df_no_allnull = df  # no features → keep all

# print(f"Rows before: {df.count():,}")
# print(f"Rows after dropping all-null-feature rows: {df_no_allnull.count():,}")

# # --------------------------------------------------------------
# # 2) Drop columns with >95% missing
# # --------------------------------------------------------------

# missing_threshold = 0.95
# row_count = df_no_allnull.count()

# print(f"Computing missing rates for {len(feature_cols)} feature columns...")

# null_exprs = []
# for c in feature_cols:
#     if c in numeric_cols:
#         null_exprs.append(
#             F.sum((F.col(c).isNull() | F.isnan(c)).cast("int")).alias(c)
#         )
#     else:
#         null_exprs.append(F.sum(F.col(c).isNull().cast("int")).alias(c))

# missing_counts = df_no_allnull.select(null_exprs).first().asDict()
# drop_cols = [
#     c for c in feature_cols
#     if missing_counts.get(c, 0) / row_count > missing_threshold
# ]

# print(f"Columns to drop (> {missing_threshold*100:.0f}% missing): {len(drop_cols)}")
# print(drop_cols)

# # Drop them
# df_clean = df_no_allnull.drop(*drop_cols)

# print(f"Final number of columns: {len(df_clean.columns)}")
# df_clean.printSchema()

# # --------------------------------------------------------------
# # REPARTITION + CACHE df_clean
# # --------------------------------------------------------------

# print("Optimizing df_clean for maximum performance...")
# df_clean = df_clean.repartition(hw["recommended_parts"])   # <<<<<< fixed key
# df_clean = df_clean.cache()

# print("Triggering cache on df_clean (30–60 sec first time)...")
# start_cache = time.time()
# cached_rows = df_clean.count()
# print(f"Cache ready in {time.time() - start_cache:.1f}s")

# print("\n" + "="*80)
# print(f"Rows       : {cached_rows:,}")
# print(f"Columns    : {len(df_clean.columns)}")
# print(f"Partitions : {df_clean.rdd.getNumPartitions()}")
# print("="*80)


Starting data cleaning...
Total feature columns: 188 (numeric: 177, cat: 11)
Building condition to drop rows with ALL features missing...
Rows before: 16,895,213


25/11/25 22:07:09 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
                                                                                

Rows after dropping all-null-feature rows: 16,895,213


25/11/25 22:07:14 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
                                                                                

Computing missing rates for 188 feature columns...


25/11/25 22:07:20 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
25/11/25 22:07:31 WARN CacheManager: Asked to cache already cached data.        


Columns to drop (> 95% missing): 13
['D_73', 'D_87', 'D_88', 'D_108', 'D_110', 'D_111', 'B_39', 'B_42', 'D_134', 'D_135', 'D_136', 'D_137', 'D_138']
Final number of columns: 179
root
 |-- customer_ID: string (nullable = true)
 |-- date: timestamp_ntz (nullable = true)
 |-- P_2: float (nullable = true)
 |-- D_39: float (nullable = true)
 |-- B_1: float (nullable = true)
 |-- B_2: float (nullable = true)
 |-- R_1: float (nullable = true)
 |-- S_3: float (nullable = true)
 |-- D_41: float (nullable = true)
 |-- B_3: float (nullable = true)
 |-- D_42: float (nullable = true)
 |-- D_43: float (nullable = true)
 |-- D_44: float (nullable = true)
 |-- B_4: float (nullable = true)
 |-- D_45: float (nullable = true)
 |-- B_5: float (nullable = true)
 |-- R_2: float (nullable = true)
 |-- D_46: float (nullable = true)
 |-- D_47: float (nullable = true)
 |-- D_48: float (nullable = true)
 |-- D_49: float (nullable = true)
 |-- B_6: float (nullable = true)
 |-- B_7: float (nullable = true)
 |-- B_

# Imputation (vectorized, PySpark ML)

In [8]:
from pyspark.sql import functions as F
import time

start = time.time()

# REBUILD numeric_cols FROM THE CURRENT df_clean (CRITICAL!)
numeric_cols = [
    c for c, t in df_clean.dtypes
    if c not in id_cols + [label_col] + categorical_cols
    and t in ('int', 'bigint', 'float', 'double', 'smallint', 'tinyint')
]

print(f"Using {len(numeric_cols)} numeric columns that actually exist")

# ONE JOB — compute means
print("Computing means ...")
mean_exprs = [F.mean(c).alias(f"{c}_mean") for c in numeric_cols]
means_row = df_clean.select(*mean_exprs).first()

mean_dict = {c: means_row[f"{c}_mean"] for c in numeric_cols}
broadcast_means = spark.sparkContext.broadcast(mean_dict)

print(f"Means computed in {time.time()-start:.1f}s")

# Imputation
print("Applying mean imputation...")
df_imputed = (
    df_clean.select(
        *[F.coalesce(F.col(c), F.lit(broadcast_means.value[c])).alias(c)
          if c in numeric_cols else F.col(c)
          for c in df_clean.columns]
    )
    .fillna("missing", subset=categorical_cols)
    .cache()
)

df_imputed.count()
print(f"\nIMPUTATION DONE IN {time.time()-start:.1f} SECONDS — PERFECT!")


Using 164 numeric columns that actually exist
Computing means ...


                                                                                

Means computed in 14.4s
Applying mean imputation...





IMPUTATION DONE IN 47.9 SECONDS — PERFECT!


                                                                                

# StringIndex for Categorical Columns

In [9]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


# New names for indexed columns
categorical_indexed_cols = [c + "_idx" for c in categorical_cols]
# One StringIndexer per categorical column

indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=c + "_idx",
        handleInvalid="keep"   # important: keep unseen/Null as a valid index
    )
    for c in categorical_cols
]

# Build and fit pipeline
indexer_pipeline = Pipeline(stages=indexers)
df_indexed = indexer_pipeline.fit(df_imputed).transform(df_imputed)

print("Indexed categorical columns added:")
print(categorical_indexed_cols)
df_indexed.select(categorical_cols + categorical_indexed_cols).show(5, truncate=False)


                                                                                

Indexed categorical columns added:
['B_30_idx', 'B_38_idx', 'D_114_idx', 'D_116_idx', 'D_117_idx', 'D_120_idx', 'D_126_idx', 'D_63_idx', 'D_64_idx', 'D_66_idx', 'D_68_idx']
+----+----+-----+-----+-----+-----+-----+----+----+----+----+--------+--------+---------+---------+---------+---------+---------+--------+--------+--------+--------+
|B_30|B_38|D_114|D_116|D_117|D_120|D_126|D_63|D_64|D_66|D_68|B_30_idx|B_38_idx|D_114_idx|D_116_idx|D_117_idx|D_120_idx|D_126_idx|D_63_idx|D_64_idx|D_66_idx|D_68_idx|
+----+----+-----+-----+-----+-----+-----+----+----+----+----+--------+--------+---------+---------+---------+---------+---------+--------+--------+--------+--------+
|0.0 |1.0 |0.0  |0.0  |-1.0 |0.0  |1.0  |CO  |O   |NULL|6.0 |0.0     |2.0     |1.0      |0.0      |0.0      |0.0      |0.0      |0.0     |0.0     |2.0     |0.0     |
|1.0 |5.0 |1.0  |0.0  |4.0  |0.0  |1.0  |CO  |O   |NULL|5.0 |1.0     |3.0     |0.0      |0.0      |1.0      |0.0      |0.0      |0.0     |0.0     |2.0     |1.0    

# VectorAssembler + StandardScaler

In [10]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# 1) Identify numeric feature columns (already imputed in df_imputed/df_indexed)
numeric_types = ('int', 'bigint', 'float', 'double', 'smallint', 'tinyint')

numeric_feature_cols = [
    c for c, t in df_indexed.dtypes
    if c not in id_cols + [label_col] + categorical_cols + categorical_indexed_cols
    and t in numeric_types
]

print("Numeric feature columns:", len(numeric_feature_cols))
print("Categorical indexed feature columns:", len(categorical_indexed_cols))

# 2) All input feature columns for the model
feature_input_cols = numeric_feature_cols + categorical_indexed_cols
print("Total feature columns going into assembler:", len(feature_input_cols))

# 3) VectorAssembler → features_unnorm
assembler = VectorAssembler(
    inputCols=feature_input_cols,
    outputCol="features_unnorm"
)

df_assembled = assembler.transform(df_indexed)

# 4) StandardScaler → features (final feature vector)
scaler = StandardScaler(
    inputCol="features_unnorm",
    outputCol="features",
    withStd=True,   # scale to unit variance
    withMean=True   # center to mean 0 (ok for tree models too, just consistent)
)


scaler_model = scaler.fit(df_assembled)
df_final = scaler_model.transform(df_assembled)

# Keep main columns: IDs, label, and features
df_final = df_final.select("customer_ID", "date", label_col, "test", "features")

print("Final dataframe for modeling:")
df_final.printSchema()
df_final.show(5, truncate=False)


Numeric feature columns: 164
Categorical indexed feature columns: 11
Total feature columns going into assembler: 175


                                                                                

Final dataframe for modeling:
root
 |-- customer_ID: string (nullable = true)
 |-- date: timestamp_ntz (nullable = true)
 |-- target: byte (nullable = true)
 |-- test: byte (nullable = true)
 |-- features: vector (nullable = true)

+----------------------------------------------------------------+-------------------+------+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Set Label Column + Train/Test Split

In [11]:
from pyspark.sql import functions as F

# 1) Ensure label is numeric (Spark GBT needs double)
df_final = df_final.withColumn(label_col, F.col(label_col).cast("double"))

# Optional: drop rows with null labels (rare, but safe)
df_final = df_final.filter(F.col(label_col).isNotNull())

# 2) Train–test split (80/20 is standard)
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)

# 3) Cache for faster training
train_df = train_df.cache()
test_df = test_df.cache()

print("Train rows:", train_df.count())
print("Test rows:", test_df.count())

train_df.show(5, truncate=False)


                                                                                

Train rows: 4424717




Test rows: 1106734
+----------------------------------------------------------------+-------------------+------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

# Checking for Class Imbalance

In [12]:
from pyspark.sql import functions as F

label_col = "target"

print("Class distribution:")
class_dist = (
    df_final.groupBy(label_col)
            .count()
            .withColumn("percentage", F.col("count") / df_final.count() * 100)
            .orderBy(label_col)
)

class_dist.show(truncate=False)


Class distribution:
+------+-------+------------------+
|target|count  |percentage        |
+------+-------+------------------+
|0.0   |4153582|75.0902792052212  |
|1.0   |1377869|24.909720794778803|
+------+-------+------------------+



                                                                                

# Train a Gradient-Boosted Trees Classifier (GBTClassifier)

In [None]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as F

# NOTE:
# GBTClassifier in Spark expects:
#   - features column named "features"
#   - label column of type DOUBLE
# Both were created in Cell 9.

# NOTE:
# The dataset is imbalanced (~75% class 0, ~25% class 1).
# We compute inverse-frequency weights dynamically, so the model
# learns to pay more attention to the minority class (label 1).


label_col = "target"
features_col = "features"

# Compute class weights: inverse frequency
class_counts = (
    train_df.groupBy(label_col).count()
             .withColumnRenamed("count", "n")
)

total = class_counts.agg(F.sum("n")).first()[0]

# Dictionary: label -> weight
weights = {
    row[label_col]: float(total) / float(row["n"])
    for row in class_counts.collect()
}

print("Class weights:", weights)

# Add weightCol to training dataframe
train_df_weighted = train_df.withColumn(
    "class_weight",
    F.when(F.col(label_col) == 1, F.lit(weights.get(1.0)))
     .otherwise(F.lit(weights.get(0.0)))
)


# -------------------------------------------------------------------
# Define the GBT model
# -------------------------------------------------------------------
# Key notes:
#  - maxDepth controls tree complexity
#  - maxIter controls number of boosting rounds
#  - stepSize is learning rate (smaller = more stable)
#  - weightCol handles class imbalance
# -------------------------------------------------------------------

gbt = GBTClassifier(
    labelCol=label_col,
    featuresCol=features_col,
    weightCol="class_weight",   # using weighted training
    maxDepth=6,                 # reasonable starting depth
    maxIter=60,                 # number of boosting rounds
    stepSize=0.1,               # learning rate
    subsamplingRate=0.8,        # stochastic boosting for stability
    maxBins=32                  # number of bins for continuous features
)

print("Training GBT model...")

model = gbt.fit(train_df_weighted)

print("Model training complete.")

Class weights: {1.0: 4.013155830150569, 0.0: 1.3318779566571668}
Training GBT model...


                                                                                

# Apply the model to test set

In [None]:
predictions = model.transform(test_df)

print("Sample predictions:")
predictions.select(
    "customer_ID", "probability", "prediction", label_col
).show(10, truncate=False)


# -------------------------------------------------------------------
# Evaluate model using AUC (BinaryClassificationEvaluator)
# -------------------------------------------------------------------

evaluator = BinaryClassificationEvaluator(
    labelCol=label_col,
    rawPredictionCol="probability",  # evaluator extracts 2nd element for class=1
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.5f}")


# Visualization

In [None]:
import matplotlib.pyplot as plt
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql import functions as F

label_col = "target"

# ---------------------------------------------------
# 1) Histogram of predicted probabilities by class
# ---------------------------------------------------
# We don't want to plot millions of points, so sample
pred_sample = (
    predictions
        .select(
            F.col("probability").getItem(1).alias("p_default"),  # P(target=1)
            F.col(label_col).cast("double").alias(label_col)
        )
        .sample(False, 0.01, seed=42)   # ~1% sample; adjust if too big/small
        .toPandas()
)

print("Sample size for plotting:", len(pred_sample))

# Separate by class
p0 = pred_sample.loc[pred_sample[label_col] == 0.0, "p_default"]
p1 = pred_sample.loc[pred_sample[label_col] == 1.0, "p_default"]

plt.figure(figsize=(8, 5))
plt.hist(p0, bins=40, alpha=0.6, label="Class 0 (no default)")
plt.hist(p1, bins=40, alpha=0.6, label="Class 1 (default)")
plt.xlabel("Predicted probability of default (P[target=1])")
plt.ylabel("Count")
plt.title("Predicted default probability distribution by class")
plt.legend()
plt.tight_layout()
plt.show()


# ---------------------------------------------------
# 2) ROC Curve + AUC using BinaryClassificationMetrics
# ---------------------------------------------------

# Prepare RDD of (score, label) pairs for Spark's metrics
score_and_labels = (
    predictions
        .select(
            F.col("probability").getItem(1).cast("double"),
            F.col(label_col).cast("double")
        )
        .rdd
        .map(tuple)   # (score, label)
)

metrics = BinaryClassificationMetrics(score_and_labels)

# ROC points as Spark RDD -> DataFrame -> pandas
roc_df = metrics.roc().toDF(["FPR", "TPR"])
roc_pd = roc_df.toPandas()

auc_roc = metrics.areaUnderROC
print(f"Area under ROC (Spark metrics): {auc_roc:.5f}")

plt.figure(figsize=(6, 6))
plt.plot(roc_pd["FPR"], roc_pd["TPR"], label=f"ROC curve (AUC = {auc_roc:.3f})")
plt.plot([0, 1], [0, 1], "k--", label="Random baseline")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="lower right")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
