# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

24/07/12 13:28:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction

We will be using Spark Python API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read

Let's read the data concerning Credit Score Classification ([source](https://www.kaggle.com/datasets/parisrohan/credit-score-classification)) from the cluster's simulated **Spark standalone cluster** into a Spark dataframe.
This dataset shows multiple information related to the credits of people registered in banks

In [3]:
data = spark.read.csv(path="data/cleaned_data.csv", sep=",", header=True)

                                                                                

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [4]:
data.count()

24/07/12 13:29:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

37827

In [None]:
len(data.columns)

Let s see the types of each column in our data

In [5]:
data.dtypes

[('ID', 'string'),
 ('Customer_ID', 'string'),
 ('Month', 'string'),
 ('Name', 'string'),
 ('Age', 'string'),
 ('SSN', 'string'),
 ('Occupation', 'string'),
 ('Annual_Income', 'string'),
 ('Monthly_Inhand_Salary', 'string'),
 ('Num_Bank_Accounts', 'string'),
 ('Interest_Rate', 'string'),
 ('Num_of_Loan', 'string'),
 ('Type_of_Loan', 'string'),
 ('Delay_from_due_date', 'string'),
 ('Num_of_Delayed_Payment', 'string'),
 ('Changed_Credit_Limit', 'string'),
 ('Num_Credit_Inquiries', 'string'),
 ('Credit_Mix', 'string'),
 ('Outstanding_Debt', 'string'),
 ('Credit_Utilization_Ratio', 'string'),
 ('Credit_History_Age', 'string'),
 ('Payment_of_Min_Amount', 'string'),
 ('Total_EMI_per_month', 'string'),
 ('Amount_invested_monthly', 'string'),
 ('Payment_Behaviour', 'string'),
 ('Monthly_Balance', 'string')]

We can clearly see that all columns are "string" type. It is necessary to change some of the columns into "float" before proceding to modeling

### 3.3. Process

The columns below are supposed to be numbers.

We ll create a function that converts the selected columns into "float"

In [6]:
columns_to_convert = [
    "Age", "Annual_Income", "Monthly_Inhand_Salary", "Num_Bank_Accounts",
    "Interest_Rate", "Num_of_Loan", "Delay_from_due_date",
    "Num_of_Delayed_Payment", "Changed_Credit_Limit", "Num_Credit_Inquiries",
    "Outstanding_Debt", "Credit_Utilization_Ratio", "Total_EMI_per_month",
    "Amount_invested_monthly", "Monthly_Balance"
]

In [7]:
from pyspark.sql.functions import col

def convert_to_float(df, column):
    return df.withColumn(column, col(column).cast("float"))

# Apply the conversion to each column
for column in columns_to_convert:
    data = convert_to_float(data, column)

# Show the schema to verify the changes
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Annual_Income: float (nullable = true)
 |-- Monthly_Inhand_Salary: float (nullable = true)
 |-- Num_Bank_Accounts: float (nullable = true)
 |-- Interest_Rate: float (nullable = true)
 |-- Num_of_Loan: float (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: float (nullable = true)
 |-- Num_of_Delayed_Payment: float (nullable = true)
 |-- Changed_Credit_Limit: float (nullable = true)
 |-- Num_Credit_Inquiries: float (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: float (nullable = true)
 |-- Credit_Utilization_Ratio: float (nullable = true)
 |-- Credit_History_Age: string (nullable = true)
 |-- Payment_of_Min_Amount: string (nullable = true)
 |-- Tot

Now that our columns have the right types, we ll check for missing values on each of them

Before that, we need to import "functions" from pyspark.sql

In [8]:
from pyspark.sql import functions as F

missing_values = data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in columns_to_convert])
missing_values.show()

# Step 2: Drop rows with any null values
clean_data = data.na.drop()

# Step 3: Fill the remaining missing values with the mean of their respective columns
for column in columns_to_convert:
    mean_value = clean_data.select(F.mean(F.col(column))).collect()[0][0]
    clean_data = clean_data.na.fill({column: mean_value})

# Step 4: Verify that all missing values are handled
clean_data_values = clean_data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in columns_to_convert])
clean_data_values.show()


                                                                                

+---+-------------+---------------------+-----------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+
|Age|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Outstanding_Debt|Credit_Utilization_Ratio|Total_EMI_per_month|Amount_invested_monthly|Monthly_Balance|
+---+-------------+---------------------+-----------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+
|  0|            0|                 5621|                0|            0|          0|                  0|                  2739|                   0|                 777|           

                                                                                

+---+-------------+---------------------+-----------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+
|Age|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Outstanding_Debt|Credit_Utilization_Ratio|Total_EMI_per_month|Amount_invested_monthly|Monthly_Balance|
+---+-------------+---------------------+-----------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+
|  0|            0|                    0|                0|            0|          0|                  0|                     0|                   0|                   0|           

                                                                                

After removing the null values, let see how many rows we have left

In [9]:
 print((clean_data.count(), len(clean_data.columns)))




(20033, 26)


                                                                                

Let s check the usual statistiques of our columns of the type "float"

In [10]:
summary_stats = clean_data.describe()
summary_stats.show()

                                                                                

+-------+-------+-----------+---------+------+------------------+-----------+----------+------------------+---------------------+------------------+-----------------+------------------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+------------------+------------------------+--------------------+---------------------+-------------------+-----------------------+--------------------+------------------+
|summary|     ID|Customer_ID|    Month|  Name|               Age|        SSN|Occupation|     Annual_Income|Monthly_Inhand_Salary| Num_Bank_Accounts|    Interest_Rate|       Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|  Outstanding_Debt|Credit_Utilization_Ratio|  Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|   Monthly_Balance|
+-------+-------+-----------+---------+------+------

<h1>Interpretation</h1>

By observing the table above, we can find many unusual values.

For example:
the mean of the column "Age" is 124.25.

the mean of the columns "Num_Bank_Accounts" is 16.57.

there are many values that seems much bigger than they should be, and that is due to mistyping and incorrect data that could ve been wrongly registered for various reasons.

<h5>Our job is to fix this data before creating a model</h5>
Let s create a boxplot for each columns to check the distrubution of values in each column


In [11]:
def compute_boxplot_values(df, column):
    quantiles = df.approxQuantile(column, [0.25, 0.5, 0.75], 0.01)
    q1, median, q3 = quantiles
    iqr = q3 - q1
    lower_whisker = df.filter(F.col(column) >= q1 - 1.5 * iqr).select(F.min(column)).first()[0]
    upper_whisker = df.filter(F.col(column) <= q3 + 1.5 * iqr).select(F.max(column)).first()[0]
    return {
        "q1": q1,
        "median": median,
        "q3": q3,
        "iqr": iqr,
        "lower_whisker": lower_whisker,
        "upper_whisker": upper_whisker
    }

Let s display it for all the columns of the type "float"

In [12]:
# Compute and display box plot values for each column
boxplot_stats = {col: compute_boxplot_values(clean_data, col) for col in columns_to_convert}
for col, stats in boxplot_stats.items():
    print(f"Boxplot stats for {col}: {stats}")


                                                                                

Boxplot stats for Age: {'q1': 25.0, 'median': 33.0, 'q3': 42.0, 'iqr': 17.0, 'lower_whisker': 14.0, 'upper_whisker': 56.0}
Boxplot stats for Annual_Income: {'q1': 18743.80078125, 'median': 35938.44921875, 'q3': 68721.078125, 'iqr': 49977.27734375, 'lower_whisker': 7005.93017578125, 'upper_whisker': 143570.796875}
Boxplot stats for Monthly_Inhand_Salary: {'q1': 1565.7220458984375, 'median': 2973.625732421875, 'q3': 5575.943359375, 'iqr': 4010.2213134765625, 'lower_whisker': 303.6454162597656, 'upper_whisker': 11587.8095703125}
Boxplot stats for Num_Bank_Accounts: {'q1': 4.0, 'median': 6.0, 'q3': 8.0, 'iqr': 4.0, 'lower_whisker': 0.0, 'upper_whisker': 11.0}
Boxplot stats for Interest_Rate: {'q1': 8.0, 'median': 15.0, 'q3': 21.0, 'iqr': 13.0, 'lower_whisker': 1.0, 'upper_whisker': 34.0}
Boxplot stats for Num_of_Loan: {'q1': 2.0, 'median': 4.0, 'q3': 6.0, 'iqr': 4.0, 'lower_whisker': 1.0, 'upper_whisker': 9.0}
Boxplot stats for Delay_from_due_date: {'q1': 10.0, 'median': 19.0, 'q3': 29.0, 

                                                                                

As we have guessed, there seems to be many outliers in our data.

<h4>We can either delete the rows of outliers or replace those values with the actual mean of the non-outliers, however, since the mean depends on many other factors (like the occupation), it will tamper with the informations of our data. So in this case, we will choose to delete the rows</h4>


In [13]:
def calculate_bounds(df, column):
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
    q1, q3 = quantiles
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    return lower_bound, upper_bound

bounds = {col: calculate_bounds(clean_data, col) for col in columns_to_convert}

for col, (lower_bound, upper_bound) in bounds.items():
    clean_data = clean_data.filter((F.col(col) >= lower_bound) & (F.col(col) <= upper_bound))

clean_data.show()

                                                                                

+------+-----------+---------+---------------+----+-----------+-------------+-------------+---------------------+-----------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+--------------------+---------------------+-------------------+-----------------------+--------------------+---------------+
|    ID|Customer_ID|    Month|           Name| Age|        SSN|   Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|  Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|
+------+-----------+---------+---------------+----+-----------+-------------+-------------+---------------------+-----

<h3>Let s check wether the outliers have been succesfully deleted by reusing the boxplot, and showing the informations related to each columns by using the 'describe' method.</h3>

In [14]:
boxplot_stats = {col: compute_boxplot_values(clean_data, col) for col in columns_to_convert}
for col, stats in boxplot_stats.items():
    print(f"Boxplot stats for {col}: {stats}")



Boxplot stats for Age: {'q1': 25.0, 'median': 33.0, 'q3': 41.0, 'iqr': 16.0, 'lower_whisker': 14.0, 'upper_whisker': 56.0}
Boxplot stats for Annual_Income: {'q1': 17492.310546875, 'median': 30564.9609375, 'q3': 47873.6015625, 'iqr': 30381.291015625, 'lower_whisker': 7005.93017578125, 'upper_whisker': 93345.7578125}
Boxplot stats for Monthly_Inhand_Salary: {'q1': 1464.657470703125, 'median': 2500.393310546875, 'q3': 3999.989990234375, 'iqr': 2535.33251953125, 'lower_whisker': 303.6454162597656, 'upper_whisker': 7795.87744140625}
Boxplot stats for Num_Bank_Accounts: {'q1': 4.0, 'median': 6.0, 'q3': 7.0, 'iqr': 3.0, 'lower_whisker': 0.0, 'upper_whisker': 11.0}
Boxplot stats for Interest_Rate: {'q1': 8.0, 'median': 15.0, 'q3': 20.0, 'iqr': 12.0, 'lower_whisker': 1.0, 'upper_whisker': 34.0}
Boxplot stats for Num_of_Loan: {'q1': 2.0, 'median': 4.0, 'q3': 5.0, 'iqr': 3.0, 'lower_whisker': 1.0, 'upper_whisker': 9.0}
Boxplot stats for Delay_from_due_date: {'q1': 10.0, 'median': 19.0, 'q3': 28.0

                                                                                

In [15]:
updated_stats= clean_data.describe()
updated_stats.show()



+-------+-------+-----------+---------+------+-----------------+-----------+----------+------------------+---------------------+------------------+------------------+------------------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+------------------+------------------------+--------------------+---------------------+-------------------+-----------------------+--------------------+------------------+
|summary|     ID|Customer_ID|    Month|  Name|              Age|        SSN|Occupation|     Annual_Income|Monthly_Inhand_Salary| Num_Bank_Accounts|     Interest_Rate|       Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|  Outstanding_Debt|Credit_Utilization_Ratio|  Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|   Monthly_Balance|
+-------+-------+-----------+---------+------+------

                                                                                

As expected, the outliers have been removed, and the data looks much better now. 

<h4>The mean of the columns "Age" and "Num_Bank_Accounts" are respectively 33.35 and 5.57, instead of unreasonable numbers like 125 and 16.</h4>

We can save our final cleaned data in a csv file using the code below.

In [45]:
output_path = "data/final_data_cleaned.csv"
clean_data.coalesce(1).write.csv(output_path, header=True)

print("Cleaned data has been saved to:", output_path)



[Stage 294:>                                                        (0 + 1) / 1]

Cleaned data has been saved to: data/final_data_cleaned.csv


                                                                                

### 3.4. Predictive model
