<a href="https://colab.research.google.com/github/Ayush245101/Distributed-Machine-Learning/blob/main/Predictive_Modeling_for_Banking_Trends.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Machine Learning with Apache Spark MLlib (Bank Dataset)

Goal: Predict if a client will subscribe to a term deposit (y = yes/no).

Step 1: Setup PySpark Environment

In [2]:
# Step 1: Setup PySpark in Google Colab

# Install Java and Spark
!apt-get update
!apt-get install openjdk-11-jdk -y

# Remove any existing Spark installation and its archive to ensure a fresh setup
!rm -rf /opt/spark
!rm -f spark-3.5.1-bin-hadoop3.tgz

# Download Spark binaries and extract them
# Using a specific version (Spark 3.5.1 with Hadoop 3) that is known to work well in Colab environments
!wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!mv spark-3.5.1-bin-hadoop3 /opt/spark

# Set environment variables for JAVA_HOME and SPARK_HOME
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"

# Install findspark and pyspark
!pip install -q findspark pyspark

# Initialize findspark to enable PySpark to work with regular Python
import findspark
findspark.init()

# Initialize Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("BankMarketingAnalysis").getOrCreate()

print("âœ… Spark Session Created Successfully")

0% [Working]            Hit:1 https://cli.github.com/packages stable InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,434 kB]
Get:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,123 kB]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:13 https://ppa.launchpadconten

Step 2: Load Dataset and Basic EDA

In [3]:
file_path = "/content/bank.csv"  # Update path if needed

df = spark.read.csv(file_path, header=True, inferSchema=True)
print("âœ… Data Loaded Successfully")

# Show first few rows
df.show(5)
df.printSchema()

# Check dataset shape
print(f"Total Records: {df.count()}, Total Columns: {len(df.columns)}")


âœ… Data Loaded Successfully
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30| unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|   services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35| management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30| management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|blue-collar|married|secondary|     no|      0|    yes|  

Step 3: Basic Data Exploration

In [4]:
# Summary of numeric features
df.describe(['age', 'balance', 'duration', 'campaign']).show()

# Target class distribution
df.groupBy("y").count().show()


+-------+------------------+------------------+------------------+------------------+
|summary|               age|           balance|          duration|          campaign|
+-------+------------------+------------------+------------------+------------------+
|  count|              4521|              4521|              4521|              4521|
|   mean| 41.17009511170095|1422.6578190665782|263.96129174961294| 2.793629727936297|
| stddev|10.576210958711263|3009.6381424673395|259.85663262468216|3.1098066601885823|
|    min|                19|             -3313|                 4|                 1|
|    max|                87|             71188|              3025|                50|
+-------+------------------+------------------+------------------+------------------+

+---+-----+
|  y|count|
+---+-----+
| no| 4000|
|yes|  521|
+---+-----+



Step 4: Data Preprocessing
Handle Missing Values

In [5]:
# Count missing/null values per column
from pyspark.sql.functions import col, sum

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

# Drop missing values for simplicity
df = df.dropna()
print(f"After dropping nulls: {df.count()} rows")


+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|  0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

After dropping nulls: 4521 rows


Handle Outliers (Capping Extreme Balances)

In [6]:
# Cap balance to reduce outlier impact
quantiles = df.approxQuantile("balance", [0.01, 0.99], 0.0)
lower, upper = quantiles
df = df.withColumn("balance", when(col("balance") < lower, lower)
                   .when(col("balance") > upper, upper)
                   .otherwise(col("balance")))


Step 5: Encode Categorical Columns

In [7]:
# Identify categorical & numerical columns
categorical_cols = [c for c, dtype in df.dtypes if dtype == 'string' and c not in ['y']]
numeric_cols = [c for c, dtype in df.dtypes if dtype != 'string']

# Index and OneHotEncode categorical features
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec") for col in categorical_cols]

# Encode target variable y
label_indexer = StringIndexer(inputCol="y", outputCol="label")


Step 6: Feature Engineering â€” Vector Assembler

In [8]:
# Combine all numeric and encoded categorical features
feature_cols = [c+"_vec" for c in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Optional scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")


Step 7: Model Selection & Training

Letâ€™s start with Logistic Regression, then compare with Decision Tree.

Split the data

In [9]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training Data: {train_df.count()}, Test Data: {test_df.count()}")


Training Data: 3662, Test Data: 859


Logistic Regression Pipeline

In [10]:
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label", maxIter=20)

pipeline_lr = Pipeline(stages=indexers + encoders + [label_indexer, assembler, scaler, lr])
model_lr = pipeline_lr.fit(train_df)

predictions_lr = model_lr.transform(test_df)
predictions_lr.select("y", "prediction", "probability").show(5)


+---+----------+--------------------+
|  y|prediction|         probability|
+---+----------+--------------------+
| no|       0.0|[0.88758385924534...|
| no|       0.0|[0.65320281565978...|
| no|       0.0|[0.99132899667943...|
| no|       0.0|[0.98772774187202...|
| no|       0.0|[0.72548822557949...|
+---+----------+--------------------+
only showing top 5 rows



Step 8: Model Evaluation

In [11]:
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions_lr)
print(f"âœ… Logistic Regression AUC: {roc_auc:.4f}")

# Calculate Accuracy
accuracy = predictions_lr.filter(col("label") == col("prediction")).count() / float(test_df.count())
print(f"âœ… Logistic Regression Accuracy: {accuracy*100:.2f}%")


âœ… Logistic Regression AUC: 0.8775
âœ… Logistic Regression Accuracy: 89.64%


Step 9: Decision Tree Model (Alternative)

In [12]:
dt = DecisionTreeClassifier(featuresCol="scaled_features", labelCol="label")

pipeline_dt = Pipeline(stages=indexers + encoders + [label_indexer, assembler, scaler, dt])
model_dt = pipeline_dt.fit(train_df)
predictions_dt = model_dt.transform(test_df)

roc_auc_dt = evaluator.evaluate(predictions_dt)
accuracy_dt = predictions_dt.filter(col("label") == col("prediction")).count() / float(test_df.count())

print(f"ðŸŒ³ Decision Tree AUC: {roc_auc_dt:.4f}")
print(f"ðŸŒ³ Decision Tree Accuracy: {accuracy_dt*100:.2f}%")


ðŸŒ³ Decision Tree AUC: 0.4696
ðŸŒ³ Decision Tree Accuracy: 89.29%


Step 10: Hyperparameter Tuning (Cross Validation)

Weâ€™ll tune Logistic Regression.

In [13]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cv_model = crossval.fit(train_df)
cv_predictions = cv_model.transform(test_df)
cv_auc = evaluator.evaluate(cv_predictions)
print(f"ðŸ”§ Best Model AUC after Tuning: {cv_auc:.4f}")


ðŸ”§ Best Model AUC after Tuning: 0.9005


Step 11: Feature Importance / Coefficients

For Logistic Regression:

In [15]:
best_lr_model = cv_model.bestModel.stages[-1]
coefficients = best_lr_model.coefficients
intercept = best_lr_model.intercept
print(f"Intercept: {intercept}")
print("Top 10 Feature Coefficients:")
print(coefficients.toArray()[:10])

Intercept: -3.278796423561272
Top 10 Feature Coefficients:
[ 0.         -0.06972094  0.          0.          0.          0.0958017
  0.          0.          0.          0.        ]


For Decision Tree:

In [16]:
best_dt_model = model_dt.stages[-1]
print("Top 10 Feature Importances:")
print(best_dt_model.featureImportances)


Top 10 Feature Importances:
(51,[2,5,14,16,18,25,26,28,36,38,43,44,45,46,47],[0.005522254535258133,0.014740868096134185,0.018527550137436857,0.026584657202310596,0.011860242357688238,0.007100041545331889,0.020668326012661706,0.020458511071773045,0.06411035996986823,0.007747119089252778,0.234694737304731,0.00631114804029501,0.02498162765950108,0.01327123114127889,0.5234213258364785])


In [20]:
import numpy as np

feature_names = []

# Reconstruct feature names for categorical columns (from StringIndexer and OneHotEncoder)
# The first set of stages in the pipeline are the StringIndexers, followed by OneHotEncoders.
# We need the labels from the StringIndexers.

# Get the list of all stages from the best model of the cross-validator
all_pipeline_stages = cv_model.bestModel.stages

# Categorical columns were processed first by StringIndexers, then OneHotEncoders.
# The first len(categorical_cols) stages are StringIndexers.
for i, col_name in enumerate(categorical_cols):
    # Get the fitted StringIndexerModel for this column
    string_indexer_model = all_pipeline_stages[i]
    labels = string_indexer_model.labels # These are the distinct categories

    # OneHotEncoder typically drops the last category to avoid multicollinearity,
    # but it appears in this pipeline setup, it's encoding all categories.
    # So, the number of output features for an OHE column is len(labels).
    # We create feature names for these (e.g., 'job_blue-collar').
    for j in range(len(labels)): # Changed from len(labels) - 1 to len(labels)
        feature_names.append(f"{col_name}_{labels[j]}")

# Append numerical feature names
for col_name in numeric_cols:
    feature_names.append(col_name)

# Get the coefficients from the final Logistic Regression model
coefficients_array = cv_model.bestModel.stages[-1].coefficients.toArray()

# Ensure the number of generated feature names matches the number of coefficients
if len(feature_names) != len(coefficients_array):
    print(f"Warning: Mismatch between number of feature names ({len(feature_names)}) and coefficients ({len(coefficients_array)}).")
else:
    # Create a list of (feature_name, coefficient) tuples
    feature_coefficient_pairs = list(zip(feature_names, coefficients_array))

    # Sort by absolute coefficient value to identify the most influential features
    feature_coefficient_pairs.sort(key=lambda x: np.abs(x[1]), reverse=True)

    print("Top 10 most influential features (Logistic Regression):")
    for name, coef in feature_coefficient_pairs[:10]:
        print(f"{name}: {coef:.4f}")

Top 10 most influential features (Logistic Regression):
duration: 0.8848
poutcome_success: 0.3356
contact_unknown: -0.3272
month_oct: 0.1890
month_mar: 0.1278
education_tertiary: 0.0995
job_retired: 0.0958
job_student: 0.0779
job_blue-collar: -0.0697
poutcome_unknown: -0.0647


In [21]:
# Get feature importances from the best Decision Tree model
dt_feature_importances = model_dt.stages[-1].featureImportances.toArray()

# Ensure the number of generated feature names matches the number of importances
if len(feature_names) != len(dt_feature_importances):
    print(f"Warning: Mismatch between number of feature names ({len(feature_names)}) and Decision Tree importances ({len(dt_feature_importances)}).")
else:
    # Create a list of (feature_name, importance) tuples
    feature_importance_pairs = list(zip(feature_names, dt_feature_importances))

    # Sort by importance value to identify the most influential features
    feature_importance_pairs.sort(key=lambda x: x[1], reverse=True)

    print("\nTop 10 most influential features (Decision Tree):")
    for name, importance in feature_importance_pairs[:10]:
        print(f"{name}: {importance:.4f}")


Top 10 most influential features (Decision Tree):
duration: 0.5234
poutcome_success: 0.2347
month_oct: 0.0641
education_tertiary: 0.0266
balance: 0.0250
contact_unknown: 0.0207
month_may: 0.0205
marital_divorced: 0.0185
job_retired: 0.0147
day: 0.0133


### Model Performance Comparison

We trained and evaluated two models:

1.  **Logistic Regression**
    *   **AUC:** 0.8775
    *   **Accuracy:** 89.64%
    *   **Tuned Logistic Regression (with Cross-Validation)**
        *   **AUC:** 0.9005

2.  **Decision Tree**
    *   **AUC:** 0.4696
    *   **Accuracy:** 89.29%

From these results, the **Tuned Logistic Regression model** performed significantly better, achieving an AUC of **0.9005**, compared to the Decision Tree's AUC of 0.4696. While the accuracies are similar, AUC provides a better measure for imbalanced datasets, indicating that Logistic Regression is more effective at distinguishing between the positive and negative classes.