### Importing Necessary Dependencies

In [2]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine

In [3]:
# Initialise the sparksession

spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [4]:
spark

### Data Extraction

In [12]:
# read dataset into the notebook
nuga_bank_df = spark.read.csv(r'C:\Users\Admin\Desktop\Data Engineering Materials\PYSPARK_WORKs\nuga_bank\Nuga_Bank\dataset\nuga_bank_transactions.csv', header=True, inferSchema=True)

# view first 10 rows of the dataset
nuga_bank_df.show(10)

+--------------------+------+----------------+--------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------------

In [25]:
# view schema

nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [26]:
# view columns
nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [27]:
# no of rows
num_rows = nuga_bank_df.count()
num_rows

1000000

In [28]:
nuga_bank_df.rdd.getNumPartitions()


8

In [30]:
# no of culumns

num_col = len(nuga_bank_df.columns)
num_col

23

In [None]:
# dataset shape
nuga_bank_df_shape = print("The shape of the datset is: ",num_rows,", ",num_col)

The shape of the datset is:  1000000 ,  23


In [33]:
# Checking for null values

for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [None]:
# Show missing values
def display_missing_values(df):
    for col_name in df.columns:
        missing_values_df = df.filter(col(col_name).isNull())
        print(f"Missing values in column '{col_name}':")
        missing_values_df.show()

# Call the function to display missing values in all columns
display_missing_values(nuga_bank_df)

Missing values in column 'Transaction_Date':
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
|Transaction_Date|Amount|Transaction_Type|Customer_Name|Customer_Address|Customer_City|Customer_State|Customer_Country|Company|Job_Title|Email|Phone_Number|Credit_Card_Number|IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|Last_Updated|Description|Gender|Marital_Status|
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------

### Data Cleaning and Transformation

In [None]:
# Filling missing values

nuga_bank_df_cleaned = nuga_bank_df.fillna({
    'Customer_Name': 'unknown',
    'Customer_Address': 'unknown',
    'Customer_City': 'unknown',
    'Customer_State': 'unknown',
    'Customer_Country': 'unknown',
    'Company': 'unknown',
    'Job_Title': 'unknown',
    'Email': 'unknown',
    'Phone_Number': 'unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'unknown',
    'Currency_Code': 'unknown',
    'Random_Number': 0.0,
    'Category': 'unknown',
    'Group': 'unknown',
    'Is_Active': 'unknown',
    'Description': 'unknown',
    'Gender': 'unknown',
    'Marital_Status': 'unknown'
    })

In [41]:
# Drop rows where last_updated is null

nuga_bank_df_cleaned = nuga_bank_df_cleaned.na.drop(subset=['Last_Updated'])

In [42]:
# Checking for null values

for column in nuga_bank_df_cleaned.columns:
    print(column, 'Nulls', nuga_bank_df_cleaned.filter(nuga_bank_df_cleaned[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [44]:
# Count of rows after dropping missing values

print("number of rows after dropping missing values is: ",nuga_bank_df_cleaned.count())

number of rows after dropping missing values is:  899679


In [51]:
# View summary statistics

nuga_bank_df_cleaned.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|     Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+-------

In [None]:
# Filter transaction with amount greater than 900

df_filtered = nuga_bank_df_cleaned.filter(nuga_bank_df_cleaned["Amount"] > 900)
print("Total count of Amount greater than 900 is: ",df_filtered.count())
print()
print(df_filtered.show())

Total count of Amount greater than 900 is:  91171

+--------------------+------+----------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+-------+--------------+
|    Transaction_Date|Amount|Transaction_Type|       Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|        Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description| Gender|Marital_Status|
+--------------------+------+----------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+-

In [None]:
# Find average Amount by  Category

from pyspark.sql.functions import mean
nuga_bank_df_cleaned.groupBy("Category").agg(mean("Amount" )).show()

+--------+------------------+
|Category|       avg(Amount)|
+--------+------------------+
| unknown| 504.3403305034023|
|       B|505.82130699927995|
|       D| 504.4554745594936|
|       C| 504.9979053190167|
|       A| 505.4787041452316|
+--------+------------------+



### Data Modelling

In [65]:
nuga_bank_df_cleaned.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']