# Banking Data Analytics - Spark Data Processing & ML
## Part 1: EDA + Part 4: Machine Learning

This notebook can be run directly in Google Colab.

---

## Setup: Install PySpark in Colab

In [None]:
# Install PySpark
!pip install pyspark -q
print("PySpark installed successfully!")

In [None]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import pandas as pd

print("Libraries imported successfully!")

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Banking Analytics") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("Spark Session created!")
print(f"Spark Version: {spark.version}")

## Upload Dataset
Upload the `bank.csv` file when prompted.

In [None]:
# For Google Colab - Upload file
from google.colab import files
uploaded = files.upload()
print("File uploaded!")

In [None]:
# Load the dataset
df = spark.read.csv("bank.csv", header=True, inferSchema=True)
print(f"Dataset loaded: {df.count()} rows, {len(df.columns)} columns")

---
# PART 1: SPARK DATA PROCESSING (EDA)
---

## Task 1: Data Loading and Basic Inspection

In [None]:
# Show first 10 rows
print("=" * 60)
print("TASK 1: DATA LOADING AND BASIC INSPECTION")
print("=" * 60)
print("\n--- First 10 Rows ---")
df.show(10, truncate=False)

In [None]:
# Print schema
print("\n--- Schema ---")
df.printSchema()

In [None]:
# Summary statistics
print("\n--- Summary Statistics ---")
df.describe().show()

## Task 2: Data Filtering and Column Operations

In [None]:
print("=" * 60)
print("TASK 2: DATA FILTERING AND COLUMN OPERATIONS")
print("=" * 60)

# Filter clients with balance > 1000
print("\n--- Clients with Balance > 1000 ---")
df_filtered = df.filter(col("balance") > 1000)
print(f"Total clients with balance > 1000: {df_filtered.count()}")
df_filtered.show(10)

In [None]:
# Add quarter column
print("\n--- Adding Quarter Column ---")
df_with_quarter = df.withColumn(
    "quarter",
    when(col("month").isin(['jan', 'feb', 'mar']), 1)
    .when(col("month").isin(['apr', 'may', 'jun']), 2)
    .when(col("month").isin(['jul', 'aug', 'sep']), 3)
    .otherwise(4)
)
df_with_quarter.select("month", "quarter").distinct().show()

## Task 3: GroupBy and Aggregation

In [None]:
print("=" * 60)
print("TASK 3: GROUPBY AND AGGREGATION")
print("=" * 60)

# Average balance by job type
print("\n--- Average Balance and Age by Job Type ---")
job_stats = df.groupBy("job").agg(
    round(avg("balance"), 2).alias("avg_balance"),
    round(avg("age"), 2).alias("avg_age")
).orderBy(col("avg_balance").desc())
job_stats.show()

In [None]:
# Subscribed clients by marital status
print("\n--- Subscribed Clients by Marital Status ---")
marital_sub = df.filter(col("y") == "yes").groupBy("marital").count().orderBy(col("count").desc())
marital_sub.show()

## Task 4: UDF for Age Groups

In [None]:
print("=" * 60)
print("TASK 4: UDF TO CATEGORIZE AGE GROUPS")
print("=" * 60)

from pyspark.sql.functions import udf

# Define UDF
def categorize_age(age):
    if age < 30:
        return "<30"
    elif age <= 60:
        return "30-60"
    else:
        return ">60"

age_group_udf = udf(categorize_age, StringType())

# Apply UDF
df_with_age = df.withColumn("age_group", age_group_udf(col("age")))
print("\n--- Age Group Distribution ---")
df_with_age.groupBy("age_group").count().orderBy("age_group").show()

## Task 5: Advanced Data Transformations

In [None]:
print("=" * 60)
print("TASK 5: ADVANCED DATA TRANSFORMATIONS")
print("=" * 60)

# Subscription rate by education
print("\n--- Subscription Rate by Education ---")
edu_stats = df.groupBy("education").agg(
    count("*").alias("total"),
    sum(when(col("y") == "yes", 1).otherwise(0)).alias("subscribed")
)
edu_stats = edu_stats.withColumn(
    "subscription_rate_pct",
    round((col("subscribed") / col("total")) * 100, 2)
).orderBy(col("subscription_rate_pct").desc())
edu_stats.show()

In [None]:
# Top 3 jobs with highest default rate
print("\n--- Top 3 Jobs with Highest Default Rate ---")
default_stats = df.groupBy("job").agg(
    count("*").alias("total"),
    sum(when(col("default") == "yes", 1).otherwise(0)).alias("defaulters")
)
default_stats = default_stats.withColumn(
    "default_rate_pct",
    round((col("defaulters") / col("total")) * 100, 2)
).orderBy(col("default_rate_pct").desc())
default_stats.show(3)

## Task 6: String Manipulation

In [None]:
print("=" * 60)
print("TASK 6: STRING MANIPULATION")
print("=" * 60)

# Concatenate job and marital
print("\n--- Concatenating Job and Marital ---")
df_concat = df.withColumn("job_marital", concat(col("job"), lit("_"), col("marital")))
df_concat.select("job", "marital", "job_marital").show(10)

# Uppercase contact
print("\n--- Contact in Uppercase ---")
df.withColumn("contact_upper", upper(col("contact"))).select("contact", "contact_upper").distinct().show()

## Task 7: Data Visualization

In [None]:
print("=" * 60)
print("TASK 7: DATA VISUALIZATION")
print("=" * 60)

# Convert to Pandas for visualization
job_counts = df.groupBy("job").count().orderBy(col("count").desc()).toPandas()

# Create bar plot
plt.figure(figsize=(12, 6))
plt.bar(job_counts['job'], job_counts['count'], color='steelblue', edgecolor='black')
plt.xlabel('Job Type', fontsize=12)
plt.ylabel('Number of Clients', fontsize=12)
plt.title('Distribution of Clients by Job Type', fontsize=14)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

## Task 8: Complex Queries

In [None]:
print("=" * 60)
print("TASK 8: COMPLEX QUERIES")
print("=" * 60)

# Month analysis
print("\n--- Monthly Contact Analysis ---")
monthly = df.groupBy("month").agg(
    count("*").alias("total_contacts"),
    sum(when(col("y") == "yes", 1).otherwise(0)).alias("subscribed")
)
monthly = monthly.withColumn(
    "success_rate_pct",
    round((col("subscribed") / col("total_contacts")) * 100, 2)
).orderBy(col("total_contacts").desc())
monthly.show()

In [None]:
# Average duration by subscription
print("\n--- Average Duration: Subscribed vs Not ---")
df.groupBy("y").agg(round(avg("duration"), 2).alias("avg_duration_sec")).show()

## Task 9: Correlation Analysis

In [None]:
print("=" * 60)
print("TASK 9: CORRELATION ANALYSIS")
print("=" * 60)

correlation = df.select(corr("age", "balance")).collect()[0][0]
print(f"\nCorrelation between Age and Balance: {correlation:.4f}")
print("Interpretation: Weak/no linear relationship")

## Task 10: Default Analysis

In [None]:
print("=" * 60)
print("TASK 10: DEFAULT ANALYSIS")
print("=" * 60)

default_dist = df.groupBy("default").count().toPandas()
print(default_dist)

# Visualization
plt.figure(figsize=(8, 5))
plt.bar(default_dist['default'], default_dist['count'], color=['green', 'red'])
plt.xlabel('Credit Default Status')
plt.ylabel('Count')
plt.title('Credit Default Distribution')
plt.show()

## Task 11: Contact Method Analysis

In [None]:
print("=" * 60)
print("TASK 11: CONTACT METHOD ANALYSIS")
print("=" * 60)

contact_stats = df.groupBy("contact").agg(
    count("*").alias("total"),
    sum(when(col("y") == "yes", 1).otherwise(0)).alias("subscribed")
)
contact_stats = contact_stats.withColumn(
    "success_rate_pct",
    round((col("subscribed") / col("total")) * 100, 2)
).orderBy(col("success_rate_pct").desc())
contact_stats.show()

## Task 12: Spark SQL

In [None]:
print("=" * 60)
print("TASK 12: SPARK SQL")
print("=" * 60)

# Create temp view
df_with_age.createOrReplaceTempView("bank_data")

# SQL Query 1
print("\n--- SQL: Average Balance by Age Group ---")
spark.sql("""
    SELECT age_group, 
           ROUND(AVG(balance), 2) as avg_balance,
           COUNT(*) as count
    FROM bank_data
    GROUP BY age_group
    ORDER BY age_group
""").show()

# SQL Query 2
print("\n--- SQL: Top 5 Job Types ---")
spark.sql("""
    SELECT job, COUNT(*) as count
    FROM bank_data
    GROUP BY job
    ORDER BY count DESC
    LIMIT 5
""").show()

---
# PART 4: SPARK ML - MACHINE LEARNING
---

## Task 1 & 2: Data Preprocessing

In [None]:
print("=" * 60)
print("SPARK ML: DATA PREPROCESSING")
print("=" * 60)

# Check for missing values
print("\n--- Missing Values Check ---")
for c in df.columns:
    missing = df.filter(col(c).isNull()).count()
    if missing > 0:
        print(f"{c}: {missing}")
print("No missing values found!")

In [None]:
# Handle outliers (IQR capping)
print("\n--- Handling Outliers ---")
numerical_cols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']

def cap_outliers(df, column):
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
    Q1, Q3 = quantiles[0], quantiles[1]
    IQR = Q3 - Q1
    lower = Q1 - 1.5 * IQR
    upper = Q3 + 1.5 * IQR
    return df.withColumn(column,
        when(col(column) < lower, lower)
        .when(col(column) > upper, upper)
        .otherwise(col(column)))

for c in numerical_cols:
    df = cap_outliers(df, c)
print(f"Outliers capped for: {numerical_cols}")

## Task 3: Feature Engineering

In [None]:
print("=" * 60)
print("FEATURE ENGINEERING")
print("=" * 60)

# Categorical columns
categorical_cols = ['job', 'marital', 'education', 'default', 'housing', 
                    'loan', 'contact', 'month', 'poutcome']

# Create indexers and encoders
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") 
            for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_enc") 
            for c in categorical_cols]

# Label indexer
label_indexer = StringIndexer(inputCol="y", outputCol="label")

# Feature assembler
feature_cols = numerical_cols + [c+"_enc" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")

# Scaler
scaler = StandardScaler(inputCol="features_raw", outputCol="features")

print(f"Feature columns: {len(feature_cols)}")

## Task 4: Model Training

In [None]:
print("=" * 60)
print("MODEL TRAINING")
print("=" * 60)

# Split data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training: {train_data.count()}, Test: {test_data.count()}")

# Define models
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100)
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label", seed=42)
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, seed=42)

# Pipeline stages
stages = indexers + encoders + [label_indexer, assembler, scaler]

In [None]:
# Train and evaluate models
models = {"Logistic Regression": lr, "Decision Tree": dt, "Random Forest": rf}
results = {}

for name, classifier in models.items():
    print(f"\n--- Training {name} ---")
    pipeline = Pipeline(stages=stages + [classifier])
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    
    # Evaluate
    auc = BinaryClassificationEvaluator(labelCol="label").evaluate(predictions)
    acc = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy").evaluate(predictions)
    f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1").evaluate(predictions)
    
    results[name] = {"AUC": auc, "Accuracy": acc, "F1": f1, "model": model}
    print(f"  AUC: {auc:.4f}, Accuracy: {acc:.4f}, F1: {f1:.4f}")

## Task 5: Model Evaluation Summary

In [None]:
print("=" * 60)
print("MODEL COMPARISON")
print("=" * 60)

print(f"{'Model':<25} {'AUC':<10} {'Accuracy':<10} {'F1':<10}")
print("-" * 55)
for name, metrics in results.items():
    print(f"{name:<25} {metrics['AUC']:.4f}     {metrics['Accuracy']:.4f}     {metrics['F1']:.4f}")

best = max(results, key=lambda x: results[x]['AUC'])
print(f"\n*** Best Model: {best} ***")

## Task 6: Hyperparameter Tuning

In [None]:
print("=" * 60)
print("HYPERPARAMETER TUNING")
print("=" * 60)

rf_tune = RandomForestClassifier(featuresCol="features", labelCol="label", seed=42)
pipeline_rf = Pipeline(stages=stages + [rf_tune])

# Parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf_tune.numTrees, [50, 100]) \
    .addGrid(rf_tune.maxDepth, [5, 10]) \
    .build()

# Cross-validator
cv = CrossValidator(
    estimator=pipeline_rf,
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(labelCol="label"),
    numFolds=3,
    seed=42
)

print("Running cross-validation...")
cv_model = cv.fit(train_data)

# Evaluate
cv_predictions = cv_model.transform(test_data)
tuned_auc = BinaryClassificationEvaluator(labelCol="label").evaluate(cv_predictions)
print(f"\nTuned Model AUC: {tuned_auc:.4f}")

## Task 7: Feature Importance

In [None]:
print("=" * 60)
print("FEATURE IMPORTANCE")
print("=" * 60)

rf_model = results["Random Forest"]["model"].stages[-1]
importances = rf_model.featureImportances.toArray()

# Create DataFrame
importance_df = pd.DataFrame({
    'feature': feature_cols[:len(importances)],
    'importance': importances
}).sort_values('importance', ascending=False)

print(importance_df.head(10))

# Plot
plt.figure(figsize=(10, 6))
top10 = importance_df.head(10)
plt.barh(top10['feature'], top10['importance'])
plt.xlabel('Importance')
plt.title('Top 10 Feature Importances')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

## Summary

In [None]:
print("=" * 60)
print("PROJECT COMPLETED!")
print("=" * 60)
print(f"""
Key Findings:
1. Best Model: {best} with AUC {results[best]['AUC']:.4f}
2. After tuning: AUC {tuned_auc:.4f}
3. Most important features: duration, poutcome, contact

Business Insights:
- Longer calls increase subscription probability
- Previous successful campaigns matter
- Cellular contact is most effective
""")

spark.stop()
print("Spark session stopped.")