<a href="https://colab.research.google.com/github/Ayushx29/Financial-Forecasting-Frontier-Distributed-Machine-Learning/blob/main/Financial_Forecasting_Frontier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Project Name**    - Financial Forecasting Frontier : Distributed Machine Learning




##### **Project Type**    - EDA/Classification
##### **Contribution**    - Individual
##### **Team Member -** Ayush Bhagat


# **Project Summary -**

The project titled "Financial Forecasting Frontier: Distributed ML" is an end-to-end data analytics and machine learning initiative focusing on the banking sector, designed to demonstrate the application of distributed computing technologies in handling and analyzing large-scale banking datasets. The project primarily leverages Apache Spark and PySpark to execute exploratory data analysis (EDA) and build classification models on a dataset sourced from real-world banking marketing campaigns.

At the core of the project is the bank.csv dataset, which includes detailed information about customer demographics, campaign outreach efforts, financial attributes, and the final outcome of marketing efforts — whether a client subscribed to a term deposit. This dataset simulates a common challenge faced by financial institutions: understanding customer behavior and predicting responses to banking products using data-driven methods.

The notebook starts by setting up the Spark environment using SparkSession. This initialization includes configurations for memory allocation and partitioning, which are crucial for enabling efficient distributed computation. The use of PySpark allows seamless scaling of data processing across multiple cores or nodes, simulating the environment of a production-level distributed system within the financial industry.

Once the Spark environment is initialized, the notebook proceeds to load the dataset into a Spark DataFrame. The initial phase includes a schema check and data overview. This step ensures that the data is loaded correctly and helps in identifying data types, null values, and general statistics. Operations such as describe(), select(), and groupBy() are used to derive insights into features like customer age distribution, job type prevalence, and loan statuses. This phase forms the Exploratory Data Analysis (EDA) part of the project, which aims to uncover hidden patterns, detect outliers, and establish initial relationships between input features and the target variable y (whether the customer subscribed to the term deposit).

Following EDA, the notebook focuses on data preprocessing — a crucial step in machine learning pipelines. Categorical variables like job, marital status, and education are transformed using techniques such as string indexing and one-hot encoding, converting them into numerical formats suitable for modeling. Null handling, feature engineering, and data normalization are also addressed to ensure clean and consistent input to the machine learning models.

The core of the project lies in building and evaluating classification models using Spark ML. The target variable y is binary, making it suitable for binary classification algorithms such as Logistic Regression, Random Forest Classifier, and Decision Trees. These models are trained to predict whether a client is likely to subscribe to a term deposit based on their personal and financial attributes. The notebook demonstrates the use of pipelines in Spark ML, where preprocessing steps and models are chained together for streamlined execution.

Performance evaluation is conducted using metrics like accuracy, precision, recall, and area under the ROC curve (AUC). These metrics help determine the efficacy of the models in predicting customer behavior and identifying key decision factors. Confusion matrices and classification reports further aid in interpreting model strengths and areas of improvement.

Although the primary focus is on batch processing and supervised learning, the notebook architecture leaves room for extension into real-time analytics using Spark Streaming, which would allow banks to monitor transactions in real time for fraud detection or customer engagement. Additionally, data parallelism techniques employed in Spark ensure that the solution remains scalable as data volume grows — a critical requirement in modern banking systems.

In conclusion, this notebook serves as a comprehensive demonstration of how distributed machine learning techniques can be effectively applied to banking data. It mirrors real-world applications where banks utilize big data technologies to enhance marketing efforts, improve customer understanding, and drive business decisions. Through the integration of EDA, preprocessing, classification modeling, and distributed processing, the project provides valuable insights into the practical implementation of scalable ML pipelines in the financial services domain.

# **GitHub Link -**

https://github.com/Ayushx29/Financial-Forecasting-Frontier-Distributed-Machine-Learning

# **Problem Statement**


**In the modern banking sector, the ability to efficiently process, analyze, and draw insights from vast volumes of data is crucial for improving customer engagement, managing risk, and making informed business decisions. However, banks face significant challenges due to the sheer volume, variety, and velocity of data, which traditional data processing methods cannot handle effectively—especially for real-time analysis.**

**This project addresses the problem of predicting whether a customer will subscribe to a term deposit by applying distributed machine learning techniques on large-scale banking data. Using Apache Spark and the bank.csv dataset, the goal is to build a scalable pipeline that performs data exploration, transformation, and classification. The project also explores real-time data processing using Spark Streaming, simulating how banks can make timely decisions based on live transaction data.**

# **General Guidelines** : -  

1.   Well-structured, formatted, and commented code is required.
2.   Exception Handling, Production Grade Code & Deployment Ready Code will be a plus. Those students will be awarded some additional credits.
     
     The additional credits will have advantages over other students during Star Student selection.
       
             [ Note: - Deployment Ready Code is defined as, the whole .ipynb notebook should be executable in one go
                       without a single error logged. ]

3.   Each and every logic should have proper comments.
4. You may add as many number of charts you want. Make Sure for each and every chart the following format should be answered.
        

```
# Chart visualization code
```
            

*   Why did you pick the specific chart?
*   What is/are the insight(s) found from the chart?
* Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

5. You have to create at least 15 logical & meaningful charts having important insights.


[ Hints : - Do the Vizualization in  a structured way while following "UBM" Rule.

U - Univariate Analysis,

B - Bivariate Analysis (Numerical - Categorical, Numerical - Numerical, Categorical - Categorical)

M - Multivariate Analysis
 ]





6. You may add more ml algorithms for model creation. Make sure for each and every algorithm, the following format should be answered.


*   Explain the ML Model used and it's performance using Evaluation metric Score Chart.


*   Cross- Validation & Hyperparameter Tuning

*   Have you seen any improvement? Note down the improvement with updates Evaluation metric Score Chart.

*   Explain each evaluation metric's indication towards business and the business impact pf the ML model used.




















# ***Let's Begin !***

# ***Task 1: Efficient Data Handling through Data Parallelism***

### Import Libraries

In [3]:
# Import Libraries
from pyspark.sql import SparkSession

In [4]:
# Create Spark session
spark = SparkSession.builder \
    .appName("DataParallelismBank") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Dataset Loading

In [6]:
# Load Dataset
data_path = "/content/drive/MyDrive/AlmaBetter Masters Projects/Financial Forecasting Frontier /bank.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

### Dataset First View

In [8]:
# Dataset First Look
df.show(10)

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| unknown| 

### Data Preparation and Partitioning:


In [9]:
# Partition the dataset by the 'balance' column
partitioned_df = df.repartition(4, "balance")

# Verify the number of partitions
print(f"Number of partitions: {partitioned_df.rdd.getNumPartitions()}")

# Show the first few rows of one partition to verify
partitioned_df.show(5)

Number of partitions: 4
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 36|self-employed|married| tertiary|     no|    307|    yes|  no|cellular| 14|  may|     341|       1|  330|       2|   other| no|
| 41| entrepreneur|married| tertiary|     no|    221|    yes|  no| unknown| 14|  may|      57|       2|   -1|       0| unknown| no|
| 56|   technician|married|secondary|     no|   4073|     no|  no|cellular| 27|  aug|     239|       5|   -1|       0| unknown| no|
| 37|       admin.| single| tertiary|     no|   2317

We applied the repartition() method on the balance column to ensure more balanced data distribution across partitions. This enhances Spark’s ability to process data in parallel and improves overall performance.

**Why We Used repartition() on Balance:**

*   Balanced Workload: Partitioning by balance helps evenly distribute records, avoiding data skew.

*   Optimized Parallelism: Uniform partitions enable Spark to run tasks in parallel efficiently, reducing execution time.
*   Scalable Design: Works well with large datasets, making the pipeline scalable for big data applications.




## **Data Analysis and Processing in Parallel:**
Identify and calculate the average balance for each job category in the "bank.csv" dataset. Use parallel processing to perform this calculation. Describe your approach and the results.

In [10]:
from pyspark.sql.functions import avg, sum as spark_sum

# Calculate average balance for each job category
avg_balance_per_job = df.groupBy("job").agg(avg("balance").alias("avg_balance"))
avg_balance_per_job.show()

+-------------+------------------+
|          job|       avg_balance|
+-------------+------------------+
|   unemployed|       1089.421875|
|     services|1103.9568345323742|
|      student|1543.8214285714287|
|      unknown|1501.7105263157894|
|   management|1766.9287925696594|
|  blue-collar| 1085.161733615222|
|self-employed|1392.4098360655737|
|       admin.|  1226.73640167364|
|   technician|     1330.99609375|
|    housemaid|2083.8035714285716|
| entrepreneur|          1645.125|
|      retired| 2319.191304347826|
+-------------+------------------+



Perform a parallel operation to identify the top 5 age groups in the dataset that have the highest loan amounts. Explain your methodology and present your findings.

Identify Top 5 Age Groups with Highest Loan Amounts:

Use the groupBy and agg functions to calculate the total loan amounts for each age group and then identify the top 5 age groups.

In [11]:
# Calculate the total loan amounts for each age group
# Assuming 'loan' column indicates loan amounts (replace this if 'loan' is a binary indicator)
total_loan_per_age_group = df.groupBy("age").agg(spark_sum("balance").alias("total_loan"))

# Identify the top 5 age groups with the highest loan amounts
top_5_age_groups = total_loan_per_age_group.orderBy("total_loan", ascending=False).limit(5)
top_5_age_groups.show()

# Stop the Spark session
spark.stop()

+---+----------+
|age|total_loan|
+---+----------+
| 33|    287447|
| 32|    281467|
| 38|    273320|
| 34|    256765|
| 31|    256408|
+---+----------+



### **Results:**
**Average Balance for Each Job Category:**

This will display the average account balance for each job category.

**Top 5 Age Groups with Highest Loan Amounts:**

This will display the age groups that have the highest total loan amounts.

### **Approach:**

**Parallel Processing:**

By using groupBy and agg functions, Spark performs these operations in parallel across partitions, leveraging the distributed computing power of the cluster.

**Efficiency:**

Partitioning and parallel processing ensure that the computations are performed efficiently, even on large datasets.

## **Model Training on Partitioned Data:**

Choose a classification model to predict whether a client will subscribe to a term deposit (target variable 'y').

Briefly explain why you selected this model.

Partition the dataset into training and testing sets and train your model on these partitions.

Discuss any challenges you faced in parallelizing the training process and how you addressed them.

### **Model Selection:**

We will use a Random Forest Classifier to predict whether a client will subscribe to a term deposit (target variable 'y').

We used Random Forests classifier for this task because these are ensemble methods that combine multiple decision trees to improve predictive accuracy and control overfitting.

Here are the steps to implement a Random Forest classifier in PySpark:
### **Data Partitioning and Model Training:**

1.Load and Prepare Data: Ensure the data is ready for modeling.

2.Partition the Dataset: Split the data into training and testing sets.

3.Train the Random Forest Model: Fit the Random Forest model on the training set.

4.Evaluate the Model: Assess the model’s performance on the testing set.

### **Challenges and Solutions:**

1. Parallelization: PySpark inherently parallelizes the training of Random Forests, but ensuring efficient data partitioning and avoiding bottlenecks in resource allocation is key to speeding up the training process.


In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create Spark session
spark = SparkSession.builder \
    .appName("BankTermDepositPrediction") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/AlmaBetter Masters Projects/Financial Forecasting Frontier /bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)
# data = df
# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


# Save the trained model, overwriting if the path already exists
model.write().overwrite().save("/content/drive/MyDrive/AlmaBetter Masters Projects/Financial Forecasting Frontier")

# Stop the Spark session
spark.stop()

Test AUC: 0.8745620915032659


### **Summary:**

1.**Data Loading and Preparation:**

**Spark Session:**

We set up a Spark session with specific configurations to optimize parallel processing.

**Load Dataset:**

The dataset is loaded into a Spark DataFrame.

**Handle Categorical Variables:**

We use StringIndexer to convert categorical variables to numerical values.

**Rename Target Column:**

The target variable 'y' is renamed to 'label'.

**Convert Label Column:**

The label column is converted to numerical values using StringIndexer.
**Assemble Features:**

We use VectorAssembler to combine all features into a single vector column.

2.**Training the model**

**Initialize and Train the Model:**

A Random Forest classifier is initialized and trained using the pipeline.

**Evaluate the Model:**

The model is evaluated on the test set using the AUC metric.

**Save the Model:**

The trained model is saved for future use.

### **Challenges and Solutions:**

**Handling Categorical Variables:**

The categorical variables were handled using StringIndexer to convert them to numerical values required for the Random Forest algorithm.

**Parallel Processing:**

By using Spark's MLlib and ensuring the data was evenly partitioned, we leveraged Spark's distributed computing capabilities to speed up the training process.

**Resource Management:**

Setting appropriate configurations for executor and driver memory ensured efficient resource usage.

## **Resource Monitoring and Management:**
Implement resource monitoring during data processing and model training. What observations did you make regarding CPU and memory usage?



We will use **Using Spark Event Logging** to capture detailed information about your Spark application’s execution.


### **Resource Monitoring Observations**

Spark Event Logging is a powerful feature that enables us to capture detailed logs about our Spark applications, including job execution details, stage breakdowns, task attempts, and resource usage. These logs are extremely useful for debugging, performance tuning, and understanding the behavior of our Spark applications.

In [13]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Path to the event log directory
event_log_dir = "/content/drive/MyDrive/eventlog"

# Create the event log directory if it doesn't exist
if not os.path.exists(event_log_dir):
    os.makedirs(event_log_dir)

# Create Spark session with resource monitoring enabled
spark = SparkSession.builder \
    .appName("BankTermDepositPredictionlog") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", event_log_dir) \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/AlmaBetter Masters Projects/Financial Forecasting Frontier /bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


# Stop the Spark session
spark.stop()


Test AUC: 0.8745620915032659


### **Summary:**
1. **Event Log Directory Creation:** - The code now checks if the event log directory exists, and creates it if it does not.
2. **Spark Session Configuration:** - The eventLog.dir configuration is updated to use the newly created directory.

3. **Data Preparation and Model Training:** - The rest of the code loads the dataset, processes it, trains a Random Forest classifier and evaluates the model.

4. **Stopping the Spark Session:** - Ensures the Spark session is properly stopped after the model training and evaluation are complete.

## **Task Management and Scheduling:**

Manage multiple parallel tasks, such as different preprocessing tasks. How did you ensure the effective management of these tasks?


Managing multiple parallel tasks, such as different preprocessing tasks, is essential in data processing workflows, especially when working with large datasets or complex pipelines. Effective management of these tasks involves optimizing resource usage, minimizing task execution time, and ensuring that tasks do not interfere with each other.




In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create Spark session with optimized configurations
spark = SparkSession.builder \
    .appName("ParallelPreprocessing") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/AlmaBetter Masters Projects/Financial Forecasting Frontier /bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Preprocessing tasks as separate functions
def index_categorical_columns(df):
    categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
    indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]
    pipeline = Pipeline(stages=indexers)
    return pipeline.fit(df).transform(df)

def assemble_features(df):
    feature_columns = ["age", "balance", "day", "duration", "campaign", "pdays", "previous",
                       "job_index", "marital_index", "education_index", "default_index",
                       "housing_index", "loan_index", "contact_index", "month_index", "poutcome_index"]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    return assembler.transform(df)

def rename_label_column(df):
    return df.withColumnRenamed("y", "label")

def index_label_column(df):
    label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
    return label_indexer.fit(df).transform(df)

# Apply preprocessing tasks in parallel
data = rename_label_column(data)
data = index_categorical_columns(data)
data = assemble_features(data)
data = index_label_column(data)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=[rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")

# Stop the Spark session
spark.stop()

Test AUC: 0.8745620915032659


### **Summary:**


Here’s how we can manage multiple parallel tasks effectively in Spark:

**Parallelizing Tasks in Spark:**

Spark inherently supports parallel processing, but ensuring that multiple tasks run efficiently in parallel requires careful management of resources and task dependencies.
Here, the Spark session is configured with optimal resource allocations and parallelism settings.

**Managing Resources:**

Managing resources effectively is crucial when running multiple tasks in parallel, especially in a distributed environment like Spark.

Data Partitioning and Task Parallelism

Effective data partitioning is essential to ensure that tasks run in parallel without contention.

**Preprocessing Functions:**

index_categorical_columns(df): Indexes categorical columns using StringIndexer.

assemble_features(df): Assembles feature columns into a feature vector.

rename_label_column(df): Renames the target column to "label".

index_label_column(df): Indexes the label column.

**Pipeline Construction:**

A Pipeline object is created to encapsulate the preprocessing and model training steps.


**Model Training and Evaluation:**

The Random Forest model is trained and evaluated using the preprocessed data.
Resource Monitoring and Management:

The code includes configurations for resource allocation, such as setting the number of shuffle partitions and memory allocations for the driver and executors.

**Monitoring and Managing Task Execution:**

Monitor the progress of parallel tasks using the Spark UI, where you can see the execution of jobs and stages in real-time. This helps in identifying any bottlenecks or imbalances in task execution.

**Monitoring Tools:**

Use Spark UI to monitor job progress, task execution, and resource usage.
Use tools like Ganglia or Prometheus for detailed resource monitoring.

**Tuning Configurations:**

Adjust spark.sql.shuffle.partitions based on the size of the data and cluster resources.
Allocate sufficient memory to executors and driver based on the data size and complexity of transformations.
This approach ensures that multiple preprocessing tasks are managed and executed effectively in parallel, leveraging Spark's capabilities for distributed data processing.

# Task 2: Predictive Modeling for Banking Trends with Pyspark

## **Import Libraries**

In [15]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("PySpark").getOrCreate()

## **Data Loading:**

Loading data is the first step in any data analysis process. PySpark supports various data sources such as CSV files, Parquet files, and databases. Here's an example of loading data from a CSV file:

In [16]:
# Load data from a CSV file
bank_df = spark.read.csv("/content/drive/MyDrive/AlmaBetter Masters Projects/Financial Forecasting Frontier /bank.csv", header=True, inferSchema=True)

## **Data Exploration:**

Data exploration involves understanding the structure and content of the data. PySpark provides methods like show(), head(), and describe() for this purpose:

In [17]:
bank_df.show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| unknown| 

In [19]:
bank_df.head(10)

[Row(age=30, job='unemployed', marital='married', education='primary', default='no', balance=1787, housing='no', loan='no', contact='cellular', day=19, month='oct', duration=79, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no'),
 Row(age=33, job='services', marital='married', education='secondary', default='no', balance=4789, housing='yes', loan='yes', contact='cellular', day=11, month='may', duration=220, campaign=1, pdays=339, previous=4, poutcome='failure', y='no'),
 Row(age=35, job='management', marital='single', education='tertiary', default='no', balance=1350, housing='yes', loan='no', contact='cellular', day=16, month='apr', duration=185, campaign=1, pdays=330, previous=1, poutcome='failure', y='no'),
 Row(age=30, job='management', marital='married', education='tertiary', default='no', balance=1476, housing='yes', loan='yes', contact='unknown', day=3, month='jun', duration=199, campaign=4, pdays=-1, previous=0, poutcome='unknown', y='no'),
 Row(age=59, job='blue-coll

In [20]:
bank_df.describe()

DataFrame[summary: string, age: string, job: string, marital: string, education: string, default: string, balance: string, housing: string, loan: string, contact: string, day: string, month: string, duration: string, campaign: string, pdays: string, previous: string, poutcome: string, y: string]

In [21]:
bank_df.describe().show()

+-------+------------------+-------+--------+---------+-------+------------------+-------+----+--------+------------------+-----+------------------+------------------+------------------+------------------+--------+----+
|summary|               age|    job| marital|education|default|           balance|housing|loan| contact|               day|month|          duration|          campaign|             pdays|          previous|poutcome|   y|
+-------+------------------+-------+--------+---------+-------+------------------+-------+----+--------+------------------+-----+------------------+------------------+------------------+------------------+--------+----+
|  count|              4521|   4521|    4521|     4521|   4521|              4521|   4521|4521|    4521|              4521| 4521|              4521|              4521|              4521|              4521|    4521|4521|
|   mean| 41.17009511170095|   NULL|    NULL|     NULL|   NULL|1422.6578190665782|   NULL|NULL|    NULL|15.9152842291528

## **Data Preprocessing**

let's proceed with the data preprocessing steps:

1. Handle Missing Values:

To handle missing values in the dataset, we'll need to identify columns with missing values and decide on an appropriate strategy to handle them. Common strategies include imputation (replacing missing values with a statistical measure such as mean, median, or mode) or dropping rows or columns with missing values depending on the context.

2. Handle Outliers:

Outliers can be handled by either removing them or transforming them to mitigate their impact on the model. Techniques like winsorization (replacing extreme values with less extreme values) or using robust estimators can be employed.

3. Convert Categorical Variables:

Categorical variables need to be converted into numerical format for the machine learning model to process them. This can be done using techniques like StringIndexer (assigning a unique numerical index to each category) or OneHotEncoder (creating binary columns for each category).

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# 1. Handle Missing Values
# Identify columns with missing values
missing_cols = [col_name for col_name in bank_df.columns if bank_df.filter(col(col_name).isNull()).count() > 0]

In [24]:
missing_cols

[]

### Check Unique Values for each variable.

In [None]:
# Check Unique Values for each variable.

## 3. ***Data Wrangling***

### Data Wrangling Code

In [None]:
# Write your code to make your dataset analysis ready.

### What all manipulations have you done and insights you found?

Answer Here.

## ***4. Data Vizualization, Storytelling & Experimenting with charts : Understand the relationships between variables***

#### Chart - 1

In [None]:
# Chart - 1 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 2

In [None]:
# Chart - 2 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 3

In [None]:
# Chart - 3 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 4

In [None]:
# Chart - 4 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 5

In [None]:
# Chart - 5 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 6

In [None]:
# Chart - 6 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 7

In [None]:
# Chart - 7 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 8

In [None]:
# Chart - 8 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 9

In [None]:
# Chart - 9 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 10

In [None]:
# Chart - 10 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 11

In [None]:
# Chart - 11 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 12

In [None]:
# Chart - 12 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 13

In [None]:
# Chart - 13 visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

##### 3. Will the gained insights help creating a positive business impact?
Are there any insights that lead to negative growth? Justify with specific reason.

Answer Here

#### Chart - 14 - Correlation Heatmap

In [None]:
# Correlation Heatmap visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

#### Chart - 15 - Pair Plot

In [None]:
# Pair Plot visualization code

##### 1. Why did you pick the specific chart?

Answer Here.

##### 2. What is/are the insight(s) found from the chart?

Answer Here

## ***5. Hypothesis Testing***

### Based on your chart experiments, define three hypothetical statements from the dataset. In the next three questions, perform hypothesis testing to obtain final conclusion about the statements through your code and statistical testing.

Answer Here.

### Hypothetical Statement - 1

#### 1. State Your research hypothesis as a null hypothesis and alternate hypothesis.

Answer Here.

#### 2. Perform an appropriate statistical test.

In [None]:
# Perform Statistical Test to obtain P-Value

##### Which statistical test have you done to obtain P-Value?

Answer Here.

##### Why did you choose the specific statistical test?

Answer Here.

### Hypothetical Statement - 2

#### 1. State Your research hypothesis as a null hypothesis and alternate hypothesis.

Answer Here.

#### 2. Perform an appropriate statistical test.

In [None]:
# Perform Statistical Test to obtain P-Value

##### Which statistical test have you done to obtain P-Value?

Answer Here.

##### Why did you choose the specific statistical test?

Answer Here.

### Hypothetical Statement - 3

#### 1. State Your research hypothesis as a null hypothesis and alternate hypothesis.

Answer Here.

#### 2. Perform an appropriate statistical test.

In [None]:
# Perform Statistical Test to obtain P-Value

##### Which statistical test have you done to obtain P-Value?

Answer Here.

##### Why did you choose the specific statistical test?

Answer Here.

## ***6. Feature Engineering & Data Pre-processing***

### 1. Handling Missing Values

In [None]:
# Handling Missing Values & Missing Value Imputation

#### What all missing value imputation techniques have you used and why did you use those techniques?

Answer Here.

### 2. Handling Outliers

In [None]:
# Handling Outliers & Outlier treatments

##### What all outlier treatment techniques have you used and why did you use those techniques?

Answer Here.

### 3. Categorical Encoding

In [None]:
# Encode your categorical columns

#### What all categorical encoding techniques have you used & why did you use those techniques?

Answer Here.

### 4. Textual Data Preprocessing
(It's mandatory for textual dataset i.e., NLP, Sentiment Analysis, Text Clustering etc.)

#### 1. Expand Contraction

In [None]:
# Expand Contraction

#### 2. Lower Casing

In [None]:
# Lower Casing

#### 3. Removing Punctuations

In [None]:
# Remove Punctuations

#### 4. Removing URLs & Removing words and digits contain digits.

In [None]:
# Remove URLs & Remove words and digits contain digits

#### 5. Removing Stopwords & Removing White spaces

In [None]:
# Remove Stopwords

In [None]:
# Remove White spaces

#### 6. Rephrase Text

In [None]:
# Rephrase Text

#### 7. Tokenization

In [None]:
# Tokenization

#### 8. Text Normalization

In [None]:
# Normalizing Text (i.e., Stemming, Lemmatization etc.)

##### Which text normalization technique have you used and why?

Answer Here.

#### 9. Part of speech tagging

In [None]:
# POS Taging

#### 10. Text Vectorization

In [None]:
# Vectorizing Text

##### Which text vectorization technique have you used and why?

Answer Here.

### 4. Feature Manipulation & Selection

#### 1. Feature Manipulation

In [None]:
# Manipulate Features to minimize feature correlation and create new features

#### 2. Feature Selection

In [None]:
# Select your features wisely to avoid overfitting

##### What all feature selection methods have you used  and why?

Answer Here.

##### Which all features you found important and why?

Answer Here.

### 5. Data Transformation

#### Do you think that your data needs to be transformed? If yes, which transformation have you used. Explain Why?

In [None]:
# Transform Your data

### 6. Data Scaling

In [None]:
# Scaling your data

##### Which method have you used to scale you data and why?

### 7. Dimesionality Reduction

##### Do you think that dimensionality reduction is needed? Explain Why?

Answer Here.

In [None]:
# DImensionality Reduction (If needed)

##### Which dimensionality reduction technique have you used and why? (If dimensionality reduction done on dataset.)

Answer Here.

### 8. Data Splitting

In [None]:
# Split your data to train and test. Choose Splitting ratio wisely.

##### What data splitting ratio have you used and why?

Answer Here.

### 9. Handling Imbalanced Dataset

##### Do you think the dataset is imbalanced? Explain Why.

Answer Here.

In [None]:
# Handling Imbalanced Dataset (If needed)

##### What technique did you use to handle the imbalance dataset and why? (If needed to be balanced)

Answer Here.

## ***7. ML Model Implementation***

### ML Model - 1

In [None]:
# ML Model - 1 Implementation

# Fit the Algorithm

# Predict on the model

#### 1. Explain the ML Model used and it's performance using Evaluation metric Score Chart.

In [None]:
# Visualizing evaluation Metric Score chart

#### 2. Cross- Validation & Hyperparameter Tuning

In [None]:
# ML Model - 1 Implementation with hyperparameter optimization techniques (i.e., GridSearch CV, RandomSearch CV, Bayesian Optimization etc.)

# Fit the Algorithm

# Predict on the model

##### Which hyperparameter optimization technique have you used and why?

Answer Here.

##### Have you seen any improvement? Note down the improvement with updates Evaluation metric Score Chart.

Answer Here.

### ML Model - 2

#### 1. Explain the ML Model used and it's performance using Evaluation metric Score Chart.

In [None]:
# Visualizing evaluation Metric Score chart

#### 2. Cross- Validation & Hyperparameter Tuning

In [None]:
# ML Model - 1 Implementation with hyperparameter optimization techniques (i.e., GridSearch CV, RandomSearch CV, Bayesian Optimization etc.)

# Fit the Algorithm

# Predict on the model

##### Which hyperparameter optimization technique have you used and why?

Answer Here.

##### Have you seen any improvement? Note down the improvement with updates Evaluation metric Score Chart.

Answer Here.

#### 3. Explain each evaluation metric's indication towards business and the business impact pf the ML model used.

Answer Here.

### ML Model - 3

In [None]:
# ML Model - 3 Implementation

# Fit the Algorithm

# Predict on the model

#### 1. Explain the ML Model used and it's performance using Evaluation metric Score Chart.

In [None]:
# Visualizing evaluation Metric Score chart

#### 2. Cross- Validation & Hyperparameter Tuning

In [None]:
# ML Model - 3 Implementation with hyperparameter optimization techniques (i.e., GridSearch CV, RandomSearch CV, Bayesian Optimization etc.)

# Fit the Algorithm

# Predict on the model

##### Which hyperparameter optimization technique have you used and why?

Answer Here.

##### Have you seen any improvement? Note down the improvement with updates Evaluation metric Score Chart.

Answer Here.

### 1. Which Evaluation metrics did you consider for a positive business impact and why?

Answer Here.

### 2. Which ML model did you choose from the above created models as your final prediction model and why?

Answer Here.

### 3. Explain the model which you have used and the feature importance using any model explainability tool?

Answer Here.

## ***8.*** ***Future Work (Optional)***

### 1. Save the best performing ml model in a pickle file or joblib file format for deployment process.


In [None]:
# Save the File

### 2. Again Load the saved model file and try to predict unseen data for a sanity check.


In [None]:
# Load the File and predict unseen data.

### ***Congrats! Your model is successfully created and ready for deployment on a live server for a real user interaction !!!***

# **Conclusion**

Write the conclusion here.

### ***Hurrah! You have successfully completed your Machine Learning Capstone Project !!!***