In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from sqlalchemy import create_engine


In [2]:
# Initialize the Spark session
spark = SparkSession.builder.appName("NugaBankETL").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/24 02:16:33 WARN Utils: Your hostname, Davids-MacBook-Air-2.local, resolves to a loopback address: 127.0.0.1; using 192.168.40.9 instead (on interface en0)
26/01/24 02:16:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/24 02:16:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark


In [3]:
nuga_bank_df = spark.read.csv('dataset/nuga_bank_transactions.csv', header=True, inferSchema=True)
nuga_bank_df.show(5)

                                                                                

+----------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+------------+--------------------+------+--------------+
|Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|Last_Updated|         Description|Gender|Marital_Status|
+----------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+------------

In [4]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)



In [5]:
# Data Cleaning and Trasnformation Steps would go here

nuga_bank_df.columns


['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [6]:
# Number of Rows
num_rows = nuga_bank_df.count()
num_rows

780427

In [7]:
# Number of Columns
num_columns = len(nuga_bank_df.columns)
num_columns

23

In [8]:
# Checking for null values in each column
# for column in nuga_bank_df.columns:
#     null_count = nuga_bank_df.filter(nuga_bank_df[column].isNull()).count()
#     print(f"Column '{column}' has {null_count} null values.")
for column in nuga_bank_df.columns:
    print(column, 'Nulls:', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

                                                                                

Transaction_Date Nulls: 0


                                                                                

Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 78271
Customer_Address Nulls: 78109
Customer_City Nulls: 78231
Customer_State Nulls: 77943
Customer_Country Nulls: 78556
Company Nulls: 78309
Job_Title Nulls: 77942
Email Nulls: 78105
Phone_Number Nulls: 78139
Credit_Card_Number Nulls: 78116
IBAN Nulls: 78080
Currency_Code Nulls: 77618
Random_Number Nulls: 77904
Category Nulls: 78131
Group Nulls: 78060
Is_Active Nulls: 78348
Last_Updated Nulls: 78192
Description Nulls: 78225
Gender Nulls: 77972
Marital_Status Nulls: 77888


In [10]:
# How to fill up missing values
nuga_bank_df = nuga_bank_df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Description': 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown'
})

In [11]:
# Drop Rows where Last_Updated is null
nuga_bank_df_clean = nuga_bank_df.na.drop(subset=['Last_Updated'])

In [12]:
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls:', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 0
Customer_Address Nulls: 0
Customer_City Nulls: 0
Customer_State Nulls: 0
Customer_Country Nulls: 0
Company Nulls: 0
Job_Title Nulls: 0
Email Nulls: 0
Phone_Number Nulls: 0
Credit_Card_Number Nulls: 0
IBAN Nulls: 0
Currency_Code Nulls: 0
Random_Number Nulls: 0
Category Nulls: 0
Group Nulls: 0
Is_Active Nulls: 0
Last_Updated Nulls: 0
Description Nulls: 0
Gender Nulls: 0
Marital_Status Nulls: 0


In [13]:
num_rows = nuga_bank_df_clean.count()
num_rows

702235

In [14]:
# To view summary statistics of the cleaned DataFrame
nuga_bank_df_clean.describe().show()

26/01/24 02:17:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+----------------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+------------+--------------------+-------+--------------+
|summary|Transaction_Date|           Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|Last_Updated|         Description| Gender|Marital_Status|
+-------+----------------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+-------------

                                                                                

In [15]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [16]:
# Transaction Table 
transaction = nuga_bank_df_clean.select('Transaction_Date','Amount','Transaction_Type')

In [17]:
# Adding the Transaction_id column
transaction = transaction.withColumn("Transaction_id", monotonically_increasing_id())

In [18]:
# Reordering the columns
transaction = transaction.select('Transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')

In [19]:
transaction.show()

+--------------+----------------+------+----------------+
|Transaction_id|Transaction_Date|Amount|Transaction_Type|
+--------------+----------------+------+----------------+
|             0|         38:55.3| 34.76|      Withdrawal|
|             1|         16:00.0|163.92|      Withdrawal|
|             2|         46:24.7|386.32|      Withdrawal|
|             3|         29:26.4|407.15|         Deposit|
|             4|         51:03.6|161.31|         Deposit|
|             5|         56:50.3|764.34|        Transfer|
|             6|         07:55.9|734.59|         Deposit|
|             7|         51:23.7|592.43|         Deposit|
|             8|         34:23.9| 927.1|         Deposit|
|             9|         46:15.7| 66.59|        Transfer|
|            10|         30:59.2| 246.3|      Withdrawal|
|            11|         22:57.2|782.32|      Withdrawal|
|            12|         16:59.4|818.42|      Withdrawal|
|            13|         55:41.4|352.23|      Withdrawal|
|            1

In [20]:
# Customer Table
customer = nuga_bank_df_clean.select('Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country',\
                                    'Email', 'Phone_Number').distinct()

In [21]:
# Add ID column to customer table
customer = customer.withColumn("Customer_id", monotonically_increasing_id())    

In [22]:
# Reorder Customer table 
customer = customer.select('Customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country',\
                                    'Email', 'Phone_Number')    

In [24]:
customer.count()

                                                                                

702235

In [23]:
customer.show()

+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+-------------------+
|Customer_id|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|       Phone_Number|
+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+-------------------+
|          0|       James Neal|54912 Holmes Lodg...|   West Keithborough|       Florida|                Togo|             Unknown|  493.720.6609x7545|
|          1|      Thomas Long| 1133 Collin Passage|          Joshuabury|   Connecticut|Lao People's Demo...|michellelynch@exa...|      (497)554-3317|
|          2|   Ashley Shelton|5297 Johnson Port...|         North Maria|    New Jersey|              Bhutan| ljordan@example.org|      (534)769-3072|
|          3|    James Rosario|56955 Moore Glens...|  North Michellefurt|    New Mexico|      

In [25]:
# Employee Table 
employee = nuga_bank_df_clean.select('Company', 'Job_Title', 'Gender', 'Marital_Status').distinct()

# Add ID column to employee table
employee = employee.withColumn("Employee_id", monotonically_increasing_id())

# Reorder Employee table
employee = employee.select('Employee_id', 'Company', 'Job_Title', 'Gender', 'Marital_Status')

In [26]:
employee.show()



+-----------+--------------------+--------------------+-------+--------------+
|Employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|         Price Group|             Unknown|   Male|        Single|
|          1|        Marshall PLC|        Photographer|   Male|        Single|
|          2|          Watson Ltd|Forest/woodland m...| Female|      Divorced|
|          3|        Russell-Cook|        Cartographer|Unknown|       Married|
|          4|      Lyons and Sons|Psychotherapist, ...|  Other|        Single|
|          5|Owens, Mills and ...|         Optometrist| Female|        Single|
|          6|Moore, Fowler and...|English as a fore...|   Male|        Single|
|          7|Vazquez, Butler a...|             Unknown|Unknown|        Single|
|          8|Dunn, Stone and Ware|Biomedical scientist|   Male|      Divorced|
|          9|Cook, Ward and Ha...|             Unkno

                                                                                

In [32]:
# Fact Table
# Join the cleaned data with dimension tables using natural keys
fact_table = nuga_bank_df_clean.join(
    customer,
    on=['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Email', 'Phone_Number'],
    how='left'
).join(
    transaction,
    on=['Transaction_Date', 'Amount', 'Transaction_Type'],
    how='left'
).join(
    employee,
    on=['Company', 'Job_Title', 'Gender', 'Marital_Status'],
    how='left'
).select('Customer_id', 'Transaction_id', 'Employee_id', 'Credit_Card_Number', 'IBAN', 'Currency_Code',
         'Random_Number', 'Category', 'Group', 'Is_Active', 'Description', 'Last_Updated')

In [33]:
fact_table.show()

                                                                                

+-----------+--------------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+------------+
|Customer_id|Transaction_id|Employee_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|         Description|Last_Updated|
+-----------+--------------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+------------+
|51539607556|   51539607556|      61608|   3560355487090395|GB53MKDK173442121...|          TMT|         2936|       A|      X|      Yes|Seem whose especi...|     09:59.8|
|42949672975|   42949672975|      52174|       676212984154|GB96QCKS260858113...|          MMK|         2332|       B|      Y|      Yes|Station look outs...|     30:41.4|
|60129542153|   60129542153|      74655|      4175891800274|GB57ORUR595498942...|          TTD|         8009|       D|      X|      Yes|Several m