### Importing and installing necessary dependencies

In [2]:
#pip install pyspark

In [3]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine

In [4]:
#pip install py4j


In [5]:

import os
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Set JAVA_HOME and SPARK_HOME
os.environ['JAVA_HOME'] = 'C:/java/jdk'
os.environ['SPARK_HOME'] = 'C:/spark/spark-3.5.1-bin-hadoop3'

# Optional: Set HADOOP_HOME if using Hadoop
os.environ['HADOOP_HOME'] = 'C:\hadoop\hadoop-3.4.0'
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['HADOOP_HOME'], 'bin')

# Configure Spark session
sparkConf = SparkConf()
sparkConf.setAppName("NugaBankETL")
sparkConf.setMaster("local[*]")  # Run Spark locally with all available cores

# Add any additional configurations as needed
sparkConf.set("spark.executor.memory", "16g")
sparkConf.set("spark.executor.cores", "8")

# Initialize SparkSession
spark = SparkSession.builder \
    .config(conf=sparkConf) \
    .getOrCreate()

# Verify Spark session
print("PySpark is working correctly with Spark version:", spark.version)

# Stop Spark session
spark.stop()

PySpark is working correctly with Spark version: 3.5.1


### Data Extraction

In [6]:
#Initializing the spark session
spark = SparkSession.builder.appName("NugaBankETL").getOrCreate()

In [7]:
#Knowing the spark UI
spark

In [8]:
#Extracting the data
nuga_df = spark.read.csv(r'dataset\nuga_bank_transactions.csv', header = True, inferSchema =True)
nuga_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [9]:
#Checking the schema
nuga_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [10]:
nuga_df.columns #Columns display

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [11]:
#Number of rows check
numRows = nuga_df.count() 
numRows

1000000

In [12]:
#Number of columns check
numCols = len(nuga_df.columns) 
numCols

23

#### Data Cleaning and Transformation

In [13]:
#Checking for null values
for column in nuga_df.columns:
    print(column, 'Nulls', nuga_df.filter(nuga_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [14]:
#Filling up missing values
nuga_dff = nuga_df.fillna({
    'Customer_Name' : 'unknown',
    'Customer_Address' : 'unknown',
    'Customer_City' : 'unknown',
    'Customer_State' : 'unknown',
    'Customer_Country' : 'unknown',
    'Company' : 'unknown',
    'Job_Title' : 'unknown',
    'Email' : 'unknown',
    'Phone_Number': 'unknown',
    'Credit_Card_Number' : 0,
    'IBAN' : 'unknown',
    'Currency_Code' : 'unknown',
    'Random_Number' : 0.0,
    'Category' : 'unknown',
    'Group' : 'unknown',
    'Is_Active' : 'unknown',
    
    'Description' : 'unknown',
    'Gender' : 'unknown',
    'Marital_Status' : 'unknown'
})

In [15]:
#Checking for null values again after filling using the new dataframe
for column in nuga_dff.columns:
    print(column, 'Nulls', nuga_dff.filter(nuga_dff[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [16]:
#Dropping Last_Updated Nulls 100321 because of the datatype, no impact 
nuga_dff = nuga_dff.na.drop(subset=['Last_Updated'])



In [17]:
#Checking for null values again after filling using the new dataframe
for column in nuga_dff.columns:
    print(column, 'Nulls', nuga_dff.filter(nuga_dff[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [18]:
#Number of rows check again after dropping nulls in Last_updated col
numRows = nuga_dff.count() 
numRows

899679

In [19]:
#Overview the summary Statistics
nuga_dff.describe().show()


+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|     Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+----------

In [20]:
#Checking the columns before Data modelling
#Data Modelled in Draw.io
nuga_dff.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [21]:
#transaction table
transactions = nuga_dff.select('Transaction_Date','Amount','Transaction_Type')


In [22]:
#Adding transactions_id to the table above, import a library
from pyspark.sql.functions import monotonically_increasing_id


In [23]:
#transaction_id
transactions = transactions.withColumn('transaction_id', monotonically_increasing_id())

In [24]:
#To reset index starting with the ID
transactions = transactions.select('transaction_id','Transaction_Date','Amount','Transaction_Type')

In [25]:
transactions.show(5)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [26]:
#customer table, using distinct to select only once
customers = nuga_dff.select( 'Customer_Name','Customer_Address','Customer_City',\
                            'Customer_State','Customer_Country','Email','Phone_Number').distinct()
customers = customers.withColumn('Customer_ID', monotonically_increasing_id())
customers = customers.select('Customer_ID','Customer_Name','Customer_Address',\
                                'Customer_City','Customer_State','Customer_Country','Email','Phone_Number')
customers.show(5)


+-----------+--------------+--------------------+--------------------+--------------+----------------+--------------------+--------------------+
|Customer_ID| Customer_Name|    Customer_Address|       Customer_City|Customer_State|Customer_Country|               Email|        Phone_Number|
+-----------+--------------+--------------------+--------------------+--------------+----------------+--------------------+--------------------+
|          0|Allen Castillo|   5750 Vanessa Neck|     New Vickiemouth|North Carolina|          Zambia|             unknown|  732.974.7438x89666|
|          1|    Tina Jones|28150 Kelsey Stat...|             unknown|          Iowa|           Qatar|gabriellemoore@ex...|    736.645.3977x275|
|          2|Michael Murphy|894 Williams Ridg...|       Dominguezview|      New York|          Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|   Brian Glenn|505 Mcdowell Gard...|South Christinech...|  South Dakota|         Lesotho|bcabrera@example.net|001-962

In [27]:
#employee table
employees = nuga_dff.select( 'Company','Job_Title','Gender','Marital_Status').distinct()
employees = employees.withColumn('Employee_ID', monotonically_increasing_id())
employees = employees.select('Employee_ID', 'Company','Job_Title','Gender','Marital_Status')
employees.show(5)

+-----------+--------------------+--------------------+------+--------------+
|Employee_ID|             Company|           Job_Title|Gender|Marital_Status|
+-----------+--------------------+--------------------+------+--------------+
|          0|Hill, Sharp and F...|             unknown|Female|      Divorced|
|          1|Schmidt, Morgan a...|     Engineer, water|Female|        Single|
|          2|Medina, Flores an...|             Curator|Female|       unknown|
|          3|         Bell-Murphy|Geographical info...|  Male|       unknown|
|          4|      Henry and Sons|Engineer, civil (...|Female|       Married|
+-----------+--------------------+--------------------+------+--------------+
only showing top 5 rows



In [28]:
#fact_table
fact_table = nuga_dff.join(customers,['Customer_Name','Customer_Address',\
                                'Customer_City','Customer_State','Customer_Country','Email','Phone_Number'], 'left') \
                     .join(transactions, ['Transaction_Date','Amount','Transaction_Type'], 'left') \
                     .join(employees, ['Company','Job_Title','Gender','Marital_Status'],'left') \
                     .select('Customer_ID','transaction_id','Employee_ID','Credit_Card_Number','IBAN','Currency_Code', \
                                'Random_Number','Category','Group','Is_Active','Last_Updated','Description')

In [29]:
fact_table.show(5)

+------------+--------------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
| Customer_ID|transaction_id|Employee_ID|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+------------+--------------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
| 34359808549|   94489280512|      63464|     4511208969578|GB04TVBU672029194...|          NAD|       6450.0|       B|    X|      Yes|2023-02-21 21:19:...|Support use detai...|
| 94489298803|   25769803776|      19099|   213133896337542|GB07IUUE487965913...|          ISK|          0.0|       D|    Y|       No|2023-08-16 10:32:...|Themselves make ago.|
| 85899402159|   77309411330|      51435|  5388658592185363|GB09MHOG513401686...|          NIS|       4609.0|      

In [30]:
# Saving and output the transformed tables/data as parquet file
#overwrite used to make it run overtime
#Due to system capacity might to execute
#transactions.write.mode('overwrite').parquet('dataset/transactions')
#customers.write.mode('overwrite').parquet('dataset/customers')
#employees.write.mode('overwrite').parquet('dataset/employees')
#fact_table.write.mode('overwrite').parquet('dataset/fact_table')

In [31]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.sparkContext.setLogLevel("DEBUG")


In [None]:
#Saving as CSV alternatively
#transactions.repartition(1).write.mode('overwrite').option('header','true').csv('dataset//mdata//csv//transactions')
#customers.repartition(1).write.mode('overwrite').option('header','true').csv('dataset//mdata//csv//customers')
#employees.repartition(1).write.mode('overwrite').option('header','true').csv('dataset//mdata//csv//employees')
#fact_table.repartition(1).write.mode('overwrite').option('header','true').csv('dataset//mdata//csv//fact_table')
#print("CSV write successful")

In [33]:
#Converting spark df to pandas df
transactions_pd = transactions.toPandas()
customers_pd = customers.toPandas()
employees_pd = employees.toPandas()
fact_table_pd = fact_table.toPandas()

In [34]:
#pip install psycopg2

In [35]:
#Loading the dataset into a postgre sql database
#Defining parameters/connections
import pandas as pd
from sqlalchemy import create_engine

db_params = {
    'username' : 'postgres',
    'password' : 'password',
    'host' : 'localhost',
    'port' : '5432',
    'database' : 'nuga_bank'
}

#connections
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

#Creatind database engine
engine  = create_engine(db_url)

#connecting to the postgres server
with engine.connect() as connection:
    #Create tables and load the data
    transactions_pd.to_sql('Transactions', connection, index= False, if_exists= 'replace')
    customers_pd.to_sql('Customers', connection, index= False, if_exists= 'replace')
    employees_pd.to_sql('Employees', connection, index= False, if_exists= 'replace')
    fact_table_pd.to_sql('Fact_Table', connection, index= False, if_exists= 'replace')

print('Database, tables and data loaded successfully')

Database, tables and data loaded successfully
