In [1]:
# Import necessary dependencies
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
import time

In [2]:
# Initialise our spark session
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()




# spark = SparkSession.builder \
#     .appName('NugaBankETL') \
#     .config("spark.driver.memory", "8g") \
#     .config("spark.ui.port", "4041") \
#     .master("local[*]") \
#     .getOrCreate()




# spark = SparkSession.builder \
#     .appName('NugaBankETL') \
#     .config("spark.driver.memory", "8g") \
#     .master("local[*]") \
#     .getOrCreate()



In [3]:
spark

In [4]:
# Data Extraction
nuga_bank_df = spark.read.csv(r'nuga_bank_transactions.csv', header=True, inferSchema=True)

In [None]:
nuga_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [6]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

### Data Cleaning and Transformation Layer

In [None]:
nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [8]:
# Number of Rows

num_rows = nuga_bank_df.count()

num_rows

1000000

In [9]:
# Number of Columns

num_columns = len(nuga_bank_df.columns)

num_columns

23

In [10]:
# Checking for null values

for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009


Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [11]:
# How to fill up missing values

nuga_bank_df_clean = nuga_bank_df.fillna({
    'Customer_Name' : 'Unknown',
    'Customer_Address' : 'Unknown',
    'Customer_City' : 'Unknown',
    'Customer_State' : 'Unknown',
    'Customer_Country' : 'Unknown',
    'Company' : 'Unknown',
    'Job_Title' : 'Unknown',
    'Email' : 'Unknown',
    'Phone_Number' : 'Unknown',
    'Credit_Card_Number' : 0,
    'IBAN' : 'Unknown',
    'Currency_Code' : 'Unknown',
    'Random_Number' : 0.0,
    'Category' : 'Unknown',
    'Group' : 'Unknown',
    'Is_Active' : 'Unknown',
    'Description' : 'Unknown',
    'Gender' : 'Unknown',
    'Marital_Status' : 'Unknown'
})

# Note: We skipped Last_Updated because from the code (nuga_bank_df.printSchema()), we saw that the data type of Last_Updated was timestamp and it's difficult to clean it, its easier to just drop it.

In [None]:
# re-Checking to confirm that all null values have been cleaned

for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [13]:
# Dropping missing values in Last_Updated: To do this, you'll have to drop rows where Last_Updated is null

nuga_bank_df_clean = nuga_bank_df_clean.na.drop(subset=['Last_Updated'])

In [14]:
# re-Checking to confirm that Last_Updated have been dropped

for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0


Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [15]:
# Checking the total number of rows to confirm that the rows where Last_Updated was null, have been dropoped.

num_rows = nuga_bank_df_clean.count()

num_rows

899679

In [16]:
# To view the summary statistics of the data
nuga_bank_df_clean.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+----------

In [17]:
# Number of Columns
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

### Creating Tables from the data Model

In [18]:
# Transaction table
transaction = nuga_bank_df_clean.select('Transaction_Date','Amount', 'Transaction_Type')

In [None]:
transaction.show()

+--------------------+------+----------------+
|    Transaction_Date|Amount|Transaction_Type|
+--------------------+------+----------------+
|2024-03-23 15:38:...| 34.76|      Withdrawal|
|2024-04-22 19:15:...|163.92|      Withdrawal|
|2024-04-12 19:46:...|386.32|      Withdrawal|
|2024-04-17 15:29:...|407.15|         Deposit|
|2024-02-10 01:51:...|161.31|         Deposit|
|2024-02-10 22:56:...|764.34|        Transfer|
|2024-04-07 00:07:...|734.59|         Deposit|
|2024-03-08 01:51:...|592.43|         Deposit|
|2024-02-01 12:34:...| 927.1|         Deposit|
|2024-03-22 16:46:...| 66.59|        Transfer|
|2024-04-23 13:30:...| 246.3|      Withdrawal|
|2024-01-13 01:22:...|782.32|      Withdrawal|
|2024-02-25 15:16:...|818.42|      Withdrawal|
|2024-01-01 20:55:...|352.23|      Withdrawal|
|2024-01-19 00:01:...|316.19|      Withdrawal|
|2024-04-09 14:40:...|662.26|      Withdrawal|
|2024-04-15 04:58:...|893.73|         Deposit|
|2024-04-12 14:32:...|746.22|      Withdrawal|
|2024-02-26 1

In [20]:
# Creating the transaction_id column

transaction = transaction.withColumn('Transaction_id', monotonically_increasing_id())

In [None]:
transaction.show()

+--------------------+------+----------------+--------------+
|    Transaction_Date|Amount|Transaction_Type|Transaction_id|
+--------------------+------+----------------+--------------+
|2024-03-23 15:38:...| 34.76|      Withdrawal|             0|
|2024-04-22 19:15:...|163.92|      Withdrawal|             1|
|2024-04-12 19:46:...|386.32|      Withdrawal|             2|
|2024-04-17 15:29:...|407.15|         Deposit|             3|
|2024-02-10 01:51:...|161.31|         Deposit|             4|
|2024-02-10 22:56:...|764.34|        Transfer|             5|
|2024-04-07 00:07:...|734.59|         Deposit|             6|
|2024-03-08 01:51:...|592.43|         Deposit|             7|
|2024-02-01 12:34:...| 927.1|         Deposit|             8|
|2024-03-22 16:46:...| 66.59|        Transfer|             9|
|2024-04-23 13:30:...| 246.3|      Withdrawal|            10|
|2024-01-13 01:22:...|782.32|      Withdrawal|            11|
|2024-02-25 15:16:...|818.42|      Withdrawal|            12|
|2024-01

In [22]:
# Now lets re-arrange the columns 
transaction = transaction.select('Transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')

In [None]:
transaction.show()

+--------------+--------------------+------+----------------+
|Transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
|             5|2024-02-10 22:56:...|764.34|        Transfer|
|             6|2024-04-07 00:07:...|734.59|         Deposit|
|             7|2024-03-08 01:51:...|592.43|         Deposit|
|             8|2024-02-01 12:34:...| 927.1|         Deposit|
|             9|2024-03-22 16:46:...| 66.59|        Transfer|
|            10|2024-04-23 13:30:...| 246.3|      Withdrawal|
|            11|2024-01-13 01:22:...|782.32|      Withdrawal|
|            12|2024-02-25 15:16:...|818.42|      Withdrawal|
|       

In [24]:
# Creating the Customer table
customer = nuga_bank_df_clean.select('Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Email', 'Phone_Number')

# Creating the customer_id column
customer = customer.withColumn('Customer_id', monotonically_increasing_id())

# Now lets re-arrange the columns 
customer = customer.select('Customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Email', 'Phone_Number')

In [None]:
customer.show()

+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+-------------------+
|Customer_id|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|       Phone_Number|
+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+-------------------+
|          0|       James Neal|54912 Holmes Lodg...|   West Keithborough|       Florida|                Togo|             Unknown|  493.720.6609x7545|
|          1|      Thomas Long| 1133 Collin Passage|          Joshuabury|   Connecticut|Lao People's Demo...|michellelynch@exa...|      (497)554-3317|
|          2|   Ashley Shelton|5297 Johnson Port...|         North Maria|    New Jersey|              Bhutan| ljordan@example.org|      (534)769-3072|
|          3|    James Rosario|56955 Moore Glens...|  North Michellefurt|    New Mexico|      

In [26]:
# Creating the employee table
employee = nuga_bank_df_clean.select('Company', 'Job_Title', 'Gender', 'Marital_Status')

# Creating the employees_id column
employee = employee.withColumn('Employee_id', monotonically_increasing_id())

# Now lets re-arrange the columns 
employee = employee.select('Employee_id', 'Company', 'Job_Title', 'Gender', 'Marital_Status')

In [None]:
employee.show()

+-----------+--------------------+--------------------+-------+--------------+
|Employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|  Other|      Divorced|
|          1|             Unknown|   Food technologist| Female|       Married|
|          2|       Jones-Mueller|Database administ...|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| Female|       Married|
|          5|           Smith Ltd| Seismic interpreter|  Other|       Married|
|          6|         Wade-Kelley|  Surveyor, minerals|   Male|       Unknown|
|          7|             Unknown|Medical laborator...| Female|        Single|
|          8|         Lindsey LLC|Programmer, appli...| Female|        Single|
|          9|         Carroll LLC|             Unkno

In [28]:
# Creating the facts table
facts_table = nuga_bank_df_clean.join(customer, ['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Email', 'Phone_Number'], 'left') \
                                .join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'left') \
                                .join(employee, ['Company', 'Job_Title', 'Gender', 'Marital_Status'], 'left') \
                                .select('Customer_id', 'Transaction_id', 'Employee_id', 'Credit_Card_Number', 'IBAN', 'Currency_Code', 'Random_Number', 'Category', 'Group', 'Is_Active', 'Last_Updated', 'Description')




In [None]:
facts_table.show()

+-----------+--------------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|Customer_id|Transaction_id|Employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+-----------+--------------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|25769803788|   25769803788|25769803788|  3524136323090715|GB83HATT453421177...|          CLP|       8888.0|       C|      Y|       No|2022-02-17 13:35:...|Thing few head de...|
|         18|            18|         18|   213112163828334|GB50TJFN039979307...|          SVC|       7382.0|       B|      Z|      Yes|2020-01-19 18:19:...|Great evening so ...|
|17179869191|   17179869191|17179869191|   180029146062238|GB17MTZS784426277...|          GIP|       5688.0|  

In [30]:
# # Saving the dataset as a parquet file
# transaction.write.mode('overwrite').parquet(r'Dataset_in_parquet/transaction')
# customer.write.mode('overwrite').parquet(r'Dataset_in_parquet/customer')
# employee.write.mode('overwrite').parquet(r'Dataset_in_parquet/employee')
# facts_table.write.mode('overwrite').parquet(r'Dataset_in_parquet/facts_table')



In [31]:
# # Saving the dataset as a CSV file
# transaction.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'Dataset_in_parquet/Dataset_in_CSV/csv/transaction')
# customer.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'Dataset_in_parquet/Dataset_in_CSV/csv/customer')
# employee.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'Dataset_in_parquet/Dataset_in_CSV/csv/employee')
# facts_table.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'Dataset_in_parquet/Dataset_in_CSV/csv/facts_table')

In [32]:
# Convert spark df to pandas df

Transaction_table = transaction.toPandas()
Customer_table = customer.toPandas()
Employee_table = employee.toPandas()
#Facts_table = facts_table.toPandas()


In [33]:
# Limiting the data tobe converted by limiting the number of rows to be converted
# Sample_facts_table = facts_table.limit(5128419)  # Adjust the number as needed
# Sample_facts_table_pd = Sample_facts_table.toPandas()


#  The facts table seems to be too large for my PC, i tried limiting the rows like you can see above but it didn't work so i'll instead limit the columns by breaking the facts table into 3.
# They'll be Facts_table1, Facts_table2 and Facts_table3


# Creating the Facts_table1 and Facts_table2
# facts_table1 = facts_table.select('Customer_id', 'Transaction_id', 'Employee_id', 'Credit_Card_Number')
# facts_table2 = facts_table.select('Random_Number', 'Category', 'Group', 'Is_Active')
# facts_table3 = facts_table.select('IBAN', 'Currency_Code', 'Last_Updated', 'Description')

# # now that the facts table have been broken down, lets now re-run the conversion of pyspark to pandas for the now two facts table
# Facts_table1 = facts_table1.toPandas()
# Facts_table2 = facts_table2.toPandas()
# Facts_table3 = facts_table3.toPandas()


# breaking it down to 3 facts tables didn't still work. in the end what worked is reducing the columns to around 2 million from the almost 9 million it was in the original dataset.
sample_facts_table = facts_table.limit(2128419)  # Adjust the number as needed
Sample_facts_table = sample_facts_table.toPandas()


In [34]:
print(Sample_facts_table.count())
print(Sample_facts_table.columns)


Customer_id           2128419
Transaction_id        2128419
Employee_id           2128419
Credit_Card_Number    2128419
IBAN                  2128419
Currency_Code         2128419
Random_Number         2128419
Category              2128419
Group                 2128419
Is_Active             2128419
Last_Updated          2128419
Description           2128419
dtype: int64
Index(['Customer_id', 'Transaction_id', 'Employee_id', 'Credit_Card_Number',
       'IBAN', 'Currency_Code', 'Random_Number', 'Category', 'Group',
       'Is_Active', 'Last_Updated', 'Description'],
      dtype='object')


### Loading Layer

In [36]:
# Load environment variables from the .env file
load_dotenv()

db_params = {
    'username' : os.getenv('DB_USER'),
    'password' : os.getenv('PASSWORD'),
    'host' : os.getenv('HOST'),
    'port': int(os.getenv('PORT')),  # Ensure PORT is an integer
    'database' : os.getenv('DATABASE')
}

# define the database connection url
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"


# create the database engine with the db url specified
engine = create_engine(db_url)

# connect to postgreSQL server
with engine.connect() as connection:
    # Create table and load the data
    Transaction_table.to_sql('Transaction_table', connection, index = False, if_exists = 'replace')
    Customer_table.to_sql('Customer_table', connection, index = False, if_exists = 'replace')
    Employee_table.to_sql('Employee_table', connection, index = False, if_exists = 'replace')
    Sample_facts_table.to_sql('Sample_facts_table', connection, index = False, if_exists = 'replace')

print('Database, table and data loaded successfully')


Database, table and data loaded successfully
