<a href="https://colab.research.google.com/github/KolekarPramod/Financial-Forecasting-Frontier-Distributed-ML/blob/main/Financial_Forecasting_Frontier_Distributed_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Project Name**    Financial Forecasting Frontier Distributed ML



##### **Project Type**    - EDA/Regression/Classification/Unsupervised
##### **Contribution**    - Individual/Team
##### **Team Member 1 -** Pramod Kolekar
##### **Team Member 2 -**

# **Project Summary -**

###Overview
In the modern banking sector, the ability to efficiently process, analyze, and draw insights from vast volumes of data is crucial. Banks and financial institutions generate and collect extensive data, including customer demographics, transaction histories, market trends, and more. This data, when effectively analyzed, can lead to improved customer service, risk management, marketing strategies, and overall operational efficiency.

###Project Background
The banking industry faces challenges in managing and utilizing large datasets due to the volume, variety, and velocity of data. Traditional data processing methods often fall short in providing timely insights and handling real-time data streams. With the advent of distributed computing and machine learning technologies, banks now have the opportunity to harness these large datasets to make informed decisions, predict market trends, and enhance customer experiences.

# **GitHub Link -**

https://github.com/KolekarPramod/Financial-Forecasting-Frontier-Distributed-ML

# **Problem Statement**




Banking institutions generate massive volumes of data daily, which traditional computing systems struggle to process efficiently. To harness the full potential of this data, there is a need for scalable, distributed machine learning approaches that can store, process, and analyze large datasets in real time. This project focuses on leveraging distributed computing techniques to analyze the "bank.csv" dataset, simulating a real-world banking environment. The objective is to uncover customer behavior patterns, identify significant trends, and support data-driven decision-making through predictive analytics. The challenge lies in integrating data storage, querying, and machine learning within a distributed framework to produce actionable business insights.


# ***Let's Begin !***

## ***1. Know Your Data***

### Import Libraries

In [None]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, countDistinct,avg
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import matplotlib.pyplot as plt

### Dataset Loading

In [None]:
# 1. Setup Spark
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

In [None]:
# 2. Load into PySpark
df = spark.read.csv('bank_data.csv', header=True, inferSchema=True)

### Dataset First View

In [None]:
# Dataset First Look
df.show(5)

### Dataset Rows & Columns count

In [None]:
# Dataset Rows & Columns count
df.columns

### Dataset Information

In [None]:
# Dataset Info
df.printSchema()

#### Duplicate Values

In [None]:
# Dataset Duplicate Value Count
df.groupBy(df.columns).count().filter("count > 1").count()

#### Missing Values/Null Values

In [None]:
# Missing Values/Null Values Count
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

### What did you know about your dataset?

Answer Here

## ***2. Understanding Your Variables***

In [None]:
# Dataset Describe
df.describe().show()

### Variables Description

$\color{yellow}{\text{age}}$ – Age of the individual (integer)

$\color{yellow}{\text{job}}$ – Job type (string)

$\color{yellow}{\text{marital}}$ – Marital status (string)

$\color{yellow}{\text{education}}$ – Education level (string)

$\color{yellow}{\text{default}}$ – Indicates if the individual has credit in default (string)

$\color{yellow}{\text{balance}}$ – Account balance (integer)

$\color{yellow}{\text{housing}}$ – Indicates if the individual has a housing loan (string)

$\color{yellow}{\text{loan}}$ – Indicates if the individual has a personal loan (string)

$\color{yellow}{\text{contact}}$ – Type of communication contact (string)

$\color{yellow}{\text{day}}$ – Last contact day of the month (integer)

$\color{yellow}{\text{month}}$ – Last contact month of the year (string)

$\color{yellow}{\text{duration}}$ – Last contact duration, in seconds (integer)

$\color{yellow}{\text{campaign}}$ – Number of contacts performed during this campaign for this client (integer)

$\color{yellow}{\text{pdays}}$ – Number of days since the client was last contacted from a previous campaign (integer, -1 means not contacted)

$\color{yellow}{\text{previous}}$ – Number of contacts before this campaign for the client (integer)

$\color{yellow}{\text{poutcome}}$ – Outcome of the previous marketing campaign (string)

$\color{yellow}{\text{y}}$ – Indicates if the client subscribed to a term deposit (string)

### Check Unique Values for each variable.

In [None]:
# Check Unique Values for each variable.
df.select([countDistinct(c).alias(c) for c in df.columns]).show()

## 3. ***Data Wrangling***

### Data Wrangling Code

In [None]:
# Write your code to make your dataset analysis ready.

### What all manipulations have you done and insights you found?

Answer Here.

## ***4. Data Vizualization, Storytelling & Experimenting with charts : Understand the relationships between variables***

#### Chart - 1

In [None]:
# Initialize a Spark session
spark = SparkSession.builder.appName("EDA with PySpark").getOrCreate()

In [None]:
# Step 1: Get value counts from Spark
vc_df = df.groupBy("y").count().orderBy("count", ascending=False)

# Step 2: Convert to pandas (small data only)
vc_pd = vc_df.toPandas()

# Step 3: Plot bar chart
plt.figure(figsize=(6, 4))
plt.bar(vc_pd["y"], vc_pd["count"], color='skyblue')
plt.xlabel("Target: y (Subscribed)")
plt.ylabel("Count")
plt.title("Term Deposit Subscription Count (Yes/No)")
plt.grid(axis='y')
plt.show()


##### What is/are the insight(s) found from the chart?

The chart shows that the majority of customers did **not** subscribe to the term deposit, indicating a strong class imbalance.
Answer Here

#### Chart - 2

In [None]:
# Chart - 2 visualization code

# Step 1: Get counts
job_counts = df.groupBy("job").count().orderBy("count", ascending=False)

# Step 2: Convert to pandas
job_pd = job_counts.toPandas()

# Step 3: Plot bar chart
plt.figure(figsize=(10, 5))
plt.bar(job_pd["job"], job_pd["count"], color='orange')
plt.xlabel("Job Type")
plt.ylabel("Number of Clients")
plt.title("Count of Clients by Job Type")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.grid(axis='y')
plt.show()

##### What is/are the insight(s) found from the chart?

Most clients are from management, blue-collar, and technician job types, while student and unknown categories have the fewest clients.
Answer Here

#### Chart - 3

In [None]:
# Chart - 3 visualization code
# Step 1: Group and count by education + target
edu_counts = df.groupBy("education", "y").count().orderBy("education", "y")

# Step 2: Pivot to wide format
edu_pivot = edu_counts.groupBy("education").pivot("y").sum("count").fillna(0)

# Step 3: Convert to pandas and plot
edu_pd = edu_pivot.toPandas()
edu_pd["total"] = edu_pd["yes"] + edu_pd["no"]
edu_pd["subscription_rate"] = edu_pd["yes"] / edu_pd["total"]

# Step 4: Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 4))
plt.bar(edu_pd["education"], edu_pd["subscription_rate"], color="green")
plt.xlabel("Education Level")
plt.ylabel("Subscription Rate")
plt.title("Subscription Rate by Education Level")
plt.xticks(rotation=45)
plt.grid(axis='y')
plt.tight_layout()
plt.show()

##### What is/are the insight(s) found from the chart?

Clients with a tertiary education have the highest subscription rate, while those with primary education have the lowest.
Answer Here

#### Chart - 4

In [None]:
# Chart - 4 visualization code
# Group and count by housing status and y
house_counts = df.groupBy("housing", "y").count().orderBy("housing", "y")

# Pivot
house_pivot = house_counts.groupBy("housing").pivot("y").sum("count").fillna(0)

# To pandas
house_pd = house_pivot.toPandas()
house_pd["total"] = house_pd["yes"] + house_pd["no"]
house_pd["subscription_rate"] = house_pd["yes"] / house_pd["total"]

# Plot
plt.figure(figsize=(6, 4))
plt.bar(house_pd["housing"], house_pd["subscription_rate"], color="purple")
plt.xlabel("Housing Loan")
plt.ylabel("Subscription Rate")
plt.title("Subscription Rate by Housing Loan")
plt.grid(axis='y')
plt.tight_layout()
plt.show()


##### What is/are the insight(s) found from the chart?

Clients without a housing loan have a significantly higher subscription rate than those with a housing loan.
Answer Here

#### Chart - 5

In [None]:
# Chart - 5 visualization code
# Compute average balance by 'y'
avg_balance = df.groupBy("y").agg(avg("balance").alias("avg_balance")).orderBy("y")

# To pandas
avg_balance_pd = avg_balance.toPandas()

# Plot
plt.figure(figsize=(6, 4))
plt.bar(avg_balance_pd["y"], avg_balance_pd["avg_balance"], color="teal")
plt.xlabel("Subscription Status")
plt.ylabel("Average Account Balance")
plt.title("Average Balance vs Subscription")
plt.grid(axis='y')
plt.tight_layout()
plt.show()


##### What is/are the insight(s) found from the chart?

Clients who subscribed to the term deposit tend to have higher average account balances compared to those who did not.
Answer Here

## ***7. ML Model Implementation***

In [None]:
# Create Spark session
spark = SparkSession.builder \
    .appName("BankTermDepositPrediction") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

1. Loads a banking dataset and infers schema automatically.
2. Converts categorical columns and target variable into numerical indices using `StringIndexer`.
3. Combines numerical and indexed categorical features into a single feature vector.
4. Builds and trains a Random Forest classifier within a PySpark `Pipeline`, then evaluates it using AUC.
5. Saves the trained model and stops the Spark session.


In [None]:
# Load the dataset
data_path = "/content/bank_data.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


# Save the trained model, overwriting if the path already exists
model.write().overwrite().save("/content/ml_model_trained")

# Stop the Spark session
spark.stop()


# ***Real-Time Transaction Analysis:***

In [None]:
import pandas as pd
import os

# Load the original CSV
df = pd.read_csv("/content/bank_data.csv")

# Add synthetic timestamps
from datetime import datetime, timedelta
base_time = datetime.now()
df["event_time"] = [base_time + timedelta(seconds=i) for i in range(len(df))]

# Create input directory for streaming
stream_dir = "/content/stream_input"
os.makedirs(stream_dir, exist_ok=True)

# Save each row as a separate CSV file
for i, row in df.iterrows():

    row_df = pd.DataFrame([row])
    row_df.to_csv(f"{stream_dir}/txn_{i:05d}.csv", index=False)


## **Window Operations and Trend Analysis:**



1. **Starts a Spark Structured Streaming Session**:
   Initializes a SparkSession configured to run locally with all CPU cores, naming the application "StreamingBankAnalysis".

2. **Defines a Schema for Incoming CSV Data**:
   Sets a structured schema for a dataset that includes customer demographic and banking details (like age, job, balance, etc.), including a TimestampType field called event_time to enable time-based operations.

3. **Reads Streaming CSV Data**:
   Continuously reads CSV files from the `/content/stream_input` directory as a streaming DataFrame using the predefined schema and with headers enabled.

4. **Performs Time-Based Aggregation**:
   Groups the incoming streaming data by a 10-second time window and the job field, then counts how many records fall into each (window, job) combination.

5. **Outputs Results to Console in Real-Time**:
   Writes the aggregated result to the console in *complete* output mode, meaning the full result table is printed every time it's updated. It continues running indefinitely until manually stopped.



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window
from pyspark.sql.types import *

# Start Spark session
spark = SparkSession.builder \
    .appName("StreamingBankAnalysis") \
    .master("local[*]") \
    .getOrCreate()

# Schema
schema = StructType([
    StructField("age", IntegerType()),
    StructField("job", StringType()),
    StructField("marital", StringType()),
    StructField("education", StringType()),
    StructField("default", StringType()),
    StructField("balance", IntegerType()),
    StructField("housing", StringType()),
    StructField("loan", StringType()),
    StructField("contact", StringType()),
    StructField("day", IntegerType()),
    StructField("month", StringType()),
    StructField("duration", IntegerType()),
    StructField("campaign", IntegerType()),
    StructField("pdays", IntegerType()),
    StructField("previous", IntegerType()),
    StructField("poutcome", StringType()),
    StructField("y", StringType()),
    StructField("event_time", TimestampType()),
])

# Stream read from /content/stream_input
df_stream = spark.readStream \
    .schema(schema) \
    .option("header", True) \
    .csv("/content/stream_input")

# Simple aggregation: count by job per 10-second window
agg = df_stream.groupBy(
    window("event_time", "10 seconds"),
    "job"
).count()

query = agg.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .start()

query.awaitTermination()


# ***Efficient Data Handling through Data Parallelism***

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BankingDataParallelism") \
    .getOrCreate()

# Load CSV data in parallel
df = spark.read.csv("/content/bank_data.csv", header=True, inferSchema=True)


## Parallel Grouping and Aggregation

In [None]:
# Parallel computation of average balance by job type
df.groupBy("job") \
  .avg("balance") \
  .show()


## Data Transformation in Parallel

In [None]:
# Add new feature in parallel
df = df.withColumn("balance_k", df["balance"] / 1000)


## Parallel Filtering + Export

In [None]:
# Repartition to utilize more cores/nodes
df = df.repartition(8)  # or spark.sparkContext.defaultParallelism


In [None]:
df.show()

### ***Hurrah! You have successfully completed your Machine Learning Capstone Project !!!***