In [0]:
from pyspark.sql import functions as F

df = spark.read.format("delta").table("bank_gold")

print("Rows:", df.count())
print("Columns:", len(df.columns))

In [0]:
# These are the columns going into the model
# We exclude duration (data leakage) and y/y_label (target variable)

feature_cols = [
    # Numeric
    "age", "balance_log", "campaign", "pdays", 
    "previous", "was_previously_contacted",
    "contact_efficiency", "month_num",
    
    # Categorical (will be encoded)
    "job", "marital", "education", "contact",
    "poutcome", "housing", "loan", "default",
    "age_group", "balance_tier", "campaign_intensity"
]

target_col = "y_label"

# Check all columns exist
missing = [c for c in feature_cols if c not in df.columns]
print("Missing columns:", missing if missing else "None — all good")
print("Total features:", len(feature_cols))

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Categorical columns to encode
cat_cols = ["job", "marital", "education", "contact",
            "poutcome", "housing", "loan", "default",
            "age_group", "balance_tier", "campaign_intensity"]

# Numeric columns (already numbers — no encoding needed)
num_cols = ["age", "balance_log", "campaign", "pdays",
            "previous", "was_previously_contacted",
            "contact_efficiency", "month_num"]

# Step 1 — StringIndexer converts text to index numbers
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", 
            handleInvalid="keep") for c in cat_cols]

# Step 2 — OneHotEncoder converts index numbers to binary vectors
encoders = [OneHotEncoder(inputCol=f"{c}_idx", 
            outputCol=f"{c}_ohe") for c in cat_cols]

# Step 3 — VectorAssembler combines all into one feature vector
assembler = VectorAssembler(
    inputCols=num_cols + [f"{c}_ohe" for c in cat_cols],
    outputCol="features",
    handleInvalid="skip"
)

# Build and run the pipeline
encoding_pipeline = Pipeline(stages=indexers + encoders + [assembler])
encoding_model = encoding_pipeline.fit(df)
df_encoded = encoding_model.transform(df)

print("Encoding complete")
print("Sample feature vector:")
df_encoded.select("features", "y_label").show(3, truncate=True)

In [0]:
# 80% training, 20% testing
# Stratified split preserves the 11%/89% class ratio in both sets
train_df, test_df = df_encoded.randomSplit([0.8, 0.2], seed=42)

# Verify split sizes and class balance
print("Training rows:", train_df.count())
print("Testing rows :", test_df.count())

print("\nTraining class distribution:")
train_df.groupBy("y_label").count().withColumn(
    "pct", F.round(F.col("count") / train_df.count() * 100, 2)
).orderBy("y_label").show()

print("Testing class distribution:")
test_df.groupBy("y_label").count().withColumn(
    "pct", F.round(F.col("count") / test_df.count() * 100, 2)
).orderBy("y_label").show()

In [0]:
# Calculate class weights
# We give the minority class (yes=1) more weight
# so the model pays more attention to it during training

total = train_df.count()
yes_count = train_df.filter(F.col("y_label") == 1).count()
no_count  = train_df.filter(F.col("y_label") == 0).count()

# Weight = total / (2 * class_count)
weight_yes = round(total / (2 * yes_count), 4)
weight_no  = round(total / (2 * no_count), 4)

print(f"Weight for yes (1): {weight_yes}")
print(f"Weight for no  (0): {weight_no}")

# Add weight column to training data
train_weighted = train_df.withColumn("class_weight",
    F.when(F.col("y_label") == 1, weight_yes)
     .otherwise(weight_no)
)

train_weighted.groupBy("y_label", "class_weight").count().show()

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="y_label",
    weightCol="class_weight",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.0,
    probabilityCol="lr_probability",
    predictionCol="lr_prediction"
)

print("Training Logistic Regression...")
lr_model = lr.fit(train_weighted)
print("Done!")

# Evaluate on test set
lr_predictions = lr_model.transform(test_df)

# Show sample predictions
lr_predictions.select(
    "y_label", "lr_prediction", "lr_probability"
).show(5, truncate=False)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC-ROC
auc_evaluator = BinaryClassificationEvaluator(
    labelCol="y_label",
    rawPredictionCol="lr_probability",
    metricName="areaUnderROC"
)
lr_auc = auc_evaluator.evaluate(lr_predictions)

# Precision
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="y_label",
    predictionCol="lr_prediction",
    metricName="weightedPrecision"
)
lr_precision = precision_evaluator.evaluate(lr_predictions)

# Recall
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="y_label",
    predictionCol="lr_prediction",
    metricName="weightedRecall"
)
lr_recall = recall_evaluator.evaluate(lr_predictions)

# F1
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="y_label",
    predictionCol="lr_prediction",
    metricName="f1"
)
lr_f1 = f1_evaluator.evaluate(lr_predictions)

print("Logistic Regression Results:")
print("-" * 35)
print(f"  AUC-ROC   : {round(lr_auc, 4)}")
print(f"  Precision : {round(lr_precision, 4)}")
print(f"  Recall    : {round(lr_recall, 4)}")
print(f"  F1 Score  : {round(lr_f1, 4)}")
print(f"  Target AUC: 0.75")
print(f"  Met target: {'Yes' if lr_auc >= 0.75 else 'No'}")

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="y_label",
    weightCol="class_weight",
    numTrees=100,
    maxDepth=8,
    featureSubsetStrategy="auto",
    probabilityCol="rf_probability",
    predictionCol="rf_prediction",
    seed=42
)

print("Training Random Forest...")
rf_model = rf.fit(train_weighted)
print("Done!")

# Evaluate on test set
rf_predictions = rf_model.transform(test_df)

# AUC-ROC
rf_auc = auc_evaluator.setRawPredictionCol("rf_probability").evaluate(rf_predictions)
rf_precision = precision_evaluator.setPredictionCol("rf_prediction").evaluate(rf_predictions)
rf_recall = recall_evaluator.setPredictionCol("rf_prediction").evaluate(rf_predictions)
rf_f1 = f1_evaluator.setPredictionCol("rf_prediction").evaluate(rf_predictions)

print("\nRandom Forest Results:")
print("-" * 35)
print(f"  AUC-ROC   : {round(rf_auc, 4)}")
print(f"  Precision : {round(rf_precision, 4)}")
print(f"  Recall    : {round(rf_recall, 4)}")
print(f"  F1 Score  : {round(rf_f1, 4)}")
print(f"  Target AUC: 0.75")
print(f"  Met target: {'Yes' if rf_auc >= 0.75 else 'No'}")

In [0]:
import pandas as pd

# Get feature importance scores
feature_names = num_cols + [f"{c}_ohe" for c in cat_cols]
importances = rf_model.featureImportances

# Convert to readable format
importance_list = [(feature_names[i], round(float(importances[i]), 4)) 
                   for i in range(len(feature_names))]

# Sort by importance
importance_df = spark.createDataFrame(importance_list, ["feature", "importance"]) \
                     .orderBy(F.desc("importance"))

print("Top 10 Most Important Features:")
print("-" * 40)
importance_df.show(10)

In [0]:
# Run Random Forest on entire dataset
all_predictions = rf_model.transform(df_encoded)

# Extract probability of yes (second element in probability vector)
extract_prob = F.udf(lambda v: float(v[1]))

all_predictions = all_predictions.withColumn(
    "predicted_prob", extract_prob(F.col("rf_probability"))
).withColumn(
    "predicted_label", F.col("rf_prediction").cast("int")
)

# Verify
print("Predictions generated:", all_predictions.count())
all_predictions.select(
    "y_label", "predicted_prob", "predicted_label"
).orderBy(F.desc("predicted_prob")).show(10)

In [0]:
# Select only what we need to update fact_marketing
predictions_slim = all_predictions.select(
    "age", "job", "balance", "campaign", "month",
    "predicted_prob", "predicted_label"
)

# Load existing fact_marketing
fact_marketing = spark.read.format("delta").table("bank_fact_marketing")

# Join predictions to fact_marketing
fact_updated = fact_marketing.join(
    predictions_slim,
    on=["age", "job", "balance", "campaign", "month"],
    how="left"
)

# Save back to fact_marketing
fact_updated.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("bank_fact_marketing")

print("fact_marketing updated with predictions")
print("Rows:", spark.read.format("delta").table("bank_fact_marketing").count())

# Verify new columns exist
spark.read.format("delta").table("bank_fact_marketing") \
    .select("predicted_prob", "predicted_label") \
    .show(5)

In [0]:
# Select only what we need
predictions_slim = all_predictions.select(
    "job", "marital", "education", "campaign", 
    "month", "pdays", "previous", "y_label",
    "predicted_prob", "predicted_label"
)

# Load existing fact_marketing
fact_marketing = spark.read.format("delta").table("bank_fact_marketing")

# Join on columns that exist in both tables
fact_updated = fact_marketing.join(
    predictions_slim,
    on=["job", "marital", "education", "campaign", 
        "month", "pdays", "previous", "y_label"],
    how="left"
)

# Save back
fact_updated.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("bank_fact_marketing")

print("fact_marketing updated")
print("Rows:", spark.read.format("delta").table("bank_fact_marketing").count())

spark.read.format("delta").table("bank_fact_marketing") \
    .select("predicted_prob", "predicted_label") \
    .show(5)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Step 1 — reload original fact_marketing (before the bad join)
# First let's restore it from gold table
fact_marketing = spark.read.format("delta").table("bank_gold") \
    .select(
        "y", "y_label", "campaign", "duration", "pdays", "previous",
        "contact_efficiency", "campaign_intensity", "was_previously_contacted",
        "poutcome", "age_group", "balance_tier", "job", "marital",
        "education", "high_value_segment", "day", "month", "month_num",
        "quarter", "contact"
    ) \
    .withColumn("fact_id", monotonically_increasing_id()) \
    .withColumn("cost_per_call", F.lit(26.54)) \
    .withColumn("total_cost", F.round(F.lit(26.54) * F.col("campaign"), 2)) \
    .withColumn("revenue_if_converted", F.lit(2000)) \
    .withColumn("actual_revenue", F.when(F.col("y_label") == 1, 2000).otherwise(0))

# Step 2 — add row ID to predictions
predictions_with_id = all_predictions \
    .withColumn("fact_id", monotonically_increasing_id()) \
    .select("fact_id", "predicted_prob", "predicted_label")

# Step 3 — join on unique ID
fact_updated = fact_marketing.join(predictions_with_id, on="fact_id", how="left")

# Step 4 — save
fact_updated.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("bank_fact_marketing")

print("Rows:", spark.read.format("delta").table("bank_fact_marketing").count())
spark.read.format("delta").table("bank_fact_marketing") \
    .select("y_label", "predicted_prob", "predicted_label").show(5)
    

In [0]:
spark.read.format("delta").table("bank_fact_marketing") \
    .select("y_label", "predicted_prob", "predicted_label") \
    .orderBy(F.desc("predicted_prob")) \
    .show(10)

In [0]:
# Divide customers into 10 deciles by predicted probability
# Check actual conversion rate in each decile

from pyspark.sql.window import Window

fact = spark.read.format("delta").table("bank_fact_marketing")

# Create deciles
window = Window.orderBy(F.desc("predicted_prob"))
fact = fact.withColumn("rank", F.row_number().over(window))
total = fact.count()

fact = fact.withColumn("decile",
    F.ceil(F.col("rank") / (total / 10))
)

# Conversion rate per decile
fact.groupBy("decile") \
    .agg(
        F.count("y_label").alias("customers"),
        F.sum("y_label").alias("converted"),
    ) \
    .withColumn("conversion_rate_%",
        F.round(F.col("converted") / F.col("customers") * 100, 2)
    ) \
    .orderBy("decile") \
    .show()