In [0]:
# ---------------------------------------------------------
# 03_Train_PD_Model (Final Robust Version)
# Goal: Train a Gradient Boosted Tree (Handling Dirty Data)
# ---------------------------------------------------------

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# 1. Load Silver Data
print("‚è≥ Loading Silver Table...")
df = spark.table("silver_lending_club")

# --- THE FIX: Use 'try_cast' instead of 'cast' ---
# try_cast returns NULL for bad data (like "Debt consolidation") instead of crashing.
print("üîß Fixing Data Types & Dropping Bad Rows...")

# Note: We use expr("try_cast(col as type)") style or the try_cast function if available
# The safest way compatible with all versions is using selectExpr or withColumn with explicit casting logic
df_clean = df.select(
    col("default_flag"),
    col("home_ownership"),
    col("purpose"),
    col("addr_state"),
    col("term_clean"),
    col("emp_length_clean"),
    col("annual_inc").try_cast("float").alias("annual_inc"),
    col("dti").try_cast("float").alias("dti"),
    col("loan_amnt").try_cast("float").alias("loan_amnt"),
    col("int_rate").try_cast("float").alias("int_rate"),
    col("installment").try_cast("float").alias("installment")
).dropna() 

# 2. Split Data (Train / Test)
print("‚úÇÔ∏è Splitting Data (80% Train, 20% Test)...")
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)

print(f"   Train Rows: {train_data.count():,}")
print(f"   Test Rows:  {test_data.count():,}")

# 3. Define Features
cat_cols = ["home_ownership", "purpose", "addr_state"]
num_cols = ["loan_amnt", "int_rate", "installment", "annual_inc", 
            "dti", "term_clean", "emp_length_clean"]

# 4. Build the ML Pipeline
# Step A: Convert Strings to Numbers (StringIndexer)
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep") 
    for c in cat_cols
]

# Step B: Assemble all features into one vector
assembler = VectorAssembler(
    inputCols=[f"{c}_index" for c in cat_cols] + num_cols,
    outputCol="features",
    handleInvalid="skip" 
)

# Step C: The Model (Gradient Boosted Tree)
# --- THE FIX: Set maxBins to 64 so it can handle all 50 US States ---
gbt = GBTClassifier(
    labelCol="default_flag", 
    featuresCol="features", 
    maxIter=20,
    maxBins=64  # <--- THIS IS THE FIX (Default is 32)
)

# Step D: Chain it all together
pipeline = Pipeline(stages=indexers + [assembler, gbt])

# 5. Train the Model
print("üöÇ Training Gradient Boosted Tree (This takes 2-5 mins)...")
model = pipeline.fit(train_data)

# 6. Make Predictions
print("üîÆ Generating Predictions...")
predictions = model.transform(test_data)

# 7. Evaluate Performance
evaluator = BinaryClassificationEvaluator(labelCol="default_flag")
auc = evaluator.evaluate(predictions)

print("-" * 30)
print(f"üöÄ MODEL TRAINED!")
print(f"üéØ Test AUC Score: {auc:.3f}")
print("-" * 30)

# 8. Save the Model
model_path = "/Volumes/workspace/default/raw_data/gbt_credit_model"
print(f"üíæ Saving model to {model_path}...")
model.write().overwrite().save(model_path)