<a href="https://colab.research.google.com/github/Farhan-ANWAR0611/Distributed-Machine-Learning/blob/main/Copy_of_Predictive_Modeling_for_Banking_Trends.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ques 1 Load the bank.csv dataset into a Spark DataFrame.


# Import SparkSession
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("ML_Spark_Bank").getOrCreate()

# Load the dataset
df = spark.read.csv("/content/bank (1).csv", header=True, inferSchema=True)

# Show the schema
df.printSchema()

# Show the first 5 rows
df.show(5)


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+

Shows the structure of the DataFrame.

Confirms that Spark correctly inferred the data types for each column:

For example:

age is IntegerType

job, marital, education, etc., are StringType

balance, day, duration, etc., are IntegerType

This ensures your data is ready for processing in Spark ML.

👀 df.show(5) Output:
Displays the first 5 rows of the dataset.

You can visually verify that:

The values are loaded correctly.

No column is completely null.

Target column y (yes/no) is present and valid.

In [None]:
#ques 2 Perform basic exploratory data analysis (EDA) to understand the dataset. Display the schema,
#the first few rows, the number of rows and columns, and descriptive statistics for numeric columns.


# Already done:
df.printSchema()
df.show(5)

# New EDA: Row and column count
print(f"Number of Rows: {df.count()}")
print(f"Number of Columns: {len(df.columns)}")

# Summary statistics for numeric columns
df.describe(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']).show()



root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+

df.count() and len(df.columns)
Shows the total number of records (rows) and number of columns in the dataset.

Example:

Number of Rows: 4521

Number of Columns: 17

This confirms the dataset is not empty and is ready for ML processing.

📊 df.describe([...]).show()
Displays summary statistics (count, mean, stddev, min, max) for numeric columns:

age, balance, day, duration, campaign, pdays, previous

This gives insights such as:

Mean balance: Helps understand typical customer account balances.

Max duration: Might indicate potential outliers (very high call duration).

High standard deviation: Indicates large variation, which could require handling outliers.



In [None]:
# ques 3 Handle Missing Values in the Dataset


# Import functions
from pyspark.sql.functions import col, when, count

# Count missing (null) values in each column
missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
missing_counts.show()


+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|  0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+



This means:

You do not need to drop or impute any data.

The dataset is clean and ready for further preprocessing.

In [None]:
#ques 4 Handle Outliers in the Dataset

# Function to detect outliers using IQR
def detect_outliers_iqr(df, column):
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    print(f"\nOutlier range for '{column}': [{lower_bound}, {upper_bound}]")
    outlier_count = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).count()
    print(f"Number of outliers in '{column}': {outlier_count}")

# Check for outliers in key numeric columns
for col_name in ['age', 'balance', 'duration']:
    detect_outliers_iqr(df, col_name)



Outlier range for 'age': [12.0, 68.0]
Number of outliers in 'age': 67

Outlier range for 'balance': [-1630.0, 2914.0]
Number of outliers in 'balance': 647

Outlier range for 'duration': [-176.0, 576.0]
Number of outliers in 'duration': 457


Insights:
Age: 67 entries fall outside the expected age range (could be very young or elderly clients).

Balance: 647 accounts have very high or very low balances, likely legitimate for a financial dataset — often kept.

Duration: 457 calls lasted longer than 576 seconds — this could be important since longer calls might indicate interest. But this variable can leak information, so it may be excluded from modeling later.

In [None]:
# ques5 Convert categorical variables to numerical format using StringIndexer or OneHotEncoder.

from pyspark.ml.feature import StringIndexer

# List of categorical columns
categorical_cols = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome', 'y']

# Create indexers
indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed") for col in categorical_cols]

# Apply transformations
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)

# Show result of indexed columns
df_indexed.select([col + "_indexed" for col in categorical_cols] + ["age", "balance", "duration"]).show(5)


+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+----------------+---------+---+-------+--------+
|job_indexed|marital_indexed|education_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|poutcome_indexed|y_indexed|age|balance|duration|
+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+----------------+---------+---+-------+--------+
|        8.0|            0.0|              2.0|            0.0|            1.0|         0.0|            0.0|          8.0|             0.0|      0.0| 30|   1787|      79|
|        4.0|            0.0|              0.0|            0.0|            0.0|         1.0|            0.0|          0.0|             1.0|      0.0| 33|   4789|     220|
|        0.0|            1.0|              1.0|            0.0|            0.0|         0.0|            0.0|          5.0|             1.0|      

I used StringIndexer to convert all string-based categorical columns into numeric indices.

I created a Pipeline to apply all the indexers efficiently in a single transformation step.

Each new column (e.g., job_indexed) contains a numeric code representing each category from the original column.

This transformation is essential because machine learning models in Spark ML require all input features to be numeric.

⚠️ Note: I also indexed the target column y (e.g., yes ➝ 1.0, no ➝ 0.0), and it will be used as the label for model training.

In [None]:
#ques 6 Create a feature vector using VectorAssembler by combining all feature columns.

from pyspark.ml.feature import VectorAssembler

# Define input features: numeric columns + indexed categorical columns (exclude target)
input_features = [
    'age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous',
    'job_indexed', 'marital_indexed', 'education_indexed', 'default_indexed',
    'housing_indexed', 'loan_indexed', 'contact_indexed',
    'month_indexed', 'poutcome_indexed'
]

# Assemble all input features into a single 'features' column
assembler = VectorAssembler(inputCols=input_features, outputCol="features")
final_df = assembler.transform(df_indexed)

# Select only the final feature vector and label
final_df.select("features", "y_indexed").show(5, truncate=False)


+--------------------------------------------------------------------------+---------+
|features                                                                  |y_indexed|
+--------------------------------------------------------------------------+---------+
|[30.0,1787.0,19.0,79.0,1.0,-1.0,0.0,8.0,0.0,2.0,0.0,1.0,0.0,0.0,8.0,0.0]  |0.0      |
|[33.0,4789.0,11.0,220.0,1.0,339.0,4.0,4.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0]|0.0      |
|[35.0,1350.0,16.0,185.0,1.0,330.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,5.0,1.0]|0.0      |
|[30.0,1476.0,3.0,199.0,4.0,-1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,1.0,3.0,0.0]  |0.0      |
|(16,[0,2,3,4,5,7,13],[59.0,5.0,226.0,1.0,-1.0,1.0,1.0])                   |0.0      |
+--------------------------------------------------------------------------+---------+
only showing top 5 rows



I used VectorAssembler to combine all relevant input features into a single column called features.

This is required by Spark ML models, which expect a feature vector (not separate columns) for training.

The output now includes:

A features column: A dense vector of all input variables.

A y_indexed column: The target variable (label), already converted to numeric.

This completes the transformation of the dataset into a form ready for model training.

In [None]:
#ques 7 Choose a classification model (e.g., Logistic Regression, Decision Tree Classifier) for predicting the subscription to a term deposit.
#Split the data into training and test sets.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Rename target column to 'label'
model_df = final_df.select("features", col("y_indexed").alias("label"))

# Split into training and test sets (80/20)
train_data, test_data = model_df.randomSplit([0.8, 0.2], seed=42)

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Train the model
lr_model = lr.fit(train_data)

# Make predictions on test data
predictions = lr_model.transform(test_data)

# Show predictions
predictions.select("features", "label", "prediction", "probability").show(5, truncate=False)



+-----------------------------------------------------------------------+-----+----------+-----------------------------------------+
|features                                                               |label|prediction|probability                              |
+-----------------------------------------------------------------------+-----+----------+-----------------------------------------+
|(16,[0,1,2,3,4,5,6,7,15],[24.0,299.0,6.0,209.0,1.0,321.0,1.0,3.0,1.0]) |0.0  |0.0       |[0.9593690832670528,0.040630916732947164]|
|(16,[0,1,2,3,4,5,6,7,15],[29.0,228.0,11.0,12.0,8.0,342.0,9.0,1.0,1.0]) |0.0  |0.0       |[0.9870533167274181,0.01294668327258186] |
|(16,[0,1,2,3,4,5,6,7,15],[30.0,-28.0,18.0,284.0,2.0,371.0,1.0,1.0,1.0])|0.0  |0.0       |[0.9511480590312332,0.04885194096876677] |
|(16,[0,1,2,3,4,5,6,7,15],[33.0,43.0,14.0,332.0,2.0,358.0,2.0,1.0,1.0]) |0.0  |0.0       |[0.9404157451105136,0.05958425488948638] |
|(16,[0,1,2,3,4,5,6,7,15],[36.0,23.0,8.0,152.0,2.0,347.0,1.0,2.0,1.0]

I chose Logistic Regression because it's effective for binary classification (predicting yes/no).

I split the dataset into:

80% training data

20% testing data

I renamed y_indexed to label (required by Spark ML).

The model was trained on the training data and used to predict the labels for test data.

The output shows:

prediction: 0.0 = No, 1.0 = Yes

probability: Confidence level for both classes



In [None]:
# ques 8 Evaluate the model on the test dataset using appropriate metrics (Accuracy, Precision, Recall, F1 Score).
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluator for Accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)

# Evaluator for Precision
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)

# Evaluator for Recall
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)

# Evaluator for F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)

# Display all metrics
print(f"🔍 Accuracy: {accuracy:.4f}")
print(f"🔍 Precision: {precision:.4f}")
print(f"🔍 Recall: {recall:.4f}")
print(f"🔍 F1 Score: {f1:.4f}")



🔍 Accuracy: 0.8917
🔍 Precision: 0.8707
🔍 Recall: 0.8917
🔍 F1 Score: 0.8707


Accuracy: 0.8917

This means 89.17% of predictions were correct.

The model is performing well in general, with a high correct prediction rate.

Precision: 0.8707

This shows that when the model predicts "yes" (subscribed), it's correct 87.07% of the time.

It's good for cases where false positives (predicting "yes" when it should be "no") are costly.

Recall: 0.8917

This shows that the model correctly predicts 89.17% of all actual "yes" cases.

It's good for minimizing false negatives (missed subscriptions).

F1 Score: 0.8707

The F1 Score of 87.07% balances precision and recall. This is often considered a good indicator of overall performance, especially in imbalanced datasets.

Conclusion:
The model is performing well in terms of accuracy, precision, recall, and F1 score.

These results suggest the model is able to correctly predict whether a client will subscribe to a term deposit, with minimal errors.

In [None]:
# ques 9 Perform hyperparameter tuning using ParamGridBuilder and CrossValidator.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")

# Build parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# CrossValidator setup
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)

# Fit model with cross-validation
cv_model = cv.fit(train_data)

# Evaluate the best model
best_model = cv_model.bestModel
best_predictions = best_model.transform(test_data)

# Recalculate Accuracy
best_accuracy = evaluator_acc.evaluate(best_predictions)
print(f"🔍 Best Model Accuracy after Tuning: {best_accuracy:.4f}")



🔍 Best Model Accuracy after Tuning: 0.8906


After performing hyperparameter tuning using 5-fold cross-validation:

The best logistic regression model achieved an accuracy of 0.8906 (or 89.06%).

🔍 Insight:
The performance is very close to the original model (which had 89.17% accuracy).

This suggests that:

Your original hyperparameters were already near optimal.

The model is stable and performs consistently under various configurations.

✅ Hyperparameter tuning is still important for validating and confirming that your model performs well across different parameter settings.

In [None]:
# ques 10  Analyze the feature importances (if applicable) or coefficients of the model to gain insights into which features are most influential in predicting the outcome.

# Convert Spark SparseVector to a Python list
coef_values = list(best_model.coefficients)

# Create a DataFrame using feature names and their corresponding coefficient values
coef_df = pd.DataFrame({
    "Feature": features_list,
    "Coefficient": coef_values
}).sort_values(by="Coefficient", ascending=False)

# Print model intercept
print("Intercept:", best_model.intercept)

# Show top positive and negative influential features
print("\nTop Influential Features:\n")
print(coef_df.to_string(index=False))


Intercept: -4.156678040680314

Top Influential Features:

          Feature  Coefficient
 poutcome_indexed     0.664650
  housing_indexed     0.467824
  marital_indexed     0.179996
    month_indexed     0.121712
         duration     0.003612
              age     0.000000
          balance     0.000000
              day     0.000000
      job_indexed     0.000000
         previous     0.000000
            pdays     0.000000
         campaign     0.000000
  default_indexed     0.000000
education_indexed     0.000000
  contact_indexed    -0.064667
     loan_indexed    -0.468615


The most influential factor is the result of previous campaign outcomes (poutcome).

Clients with housing loans, and those contacted in certain months, are also more likely to subscribe.

Clients already dealing with a personal loan are less likely to subscribe.

Several common features like age, job, and balance didn’t significantly affect predictions in this model.

