Import Necessary libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import DataFrameWriter
import os
import psycopg2

In [2]:
# Set Java Home
os.environ['JAVA_HOME'] = r'C:\java8'

In [3]:
# Initialise Spark Session
spark = SparkSession.builder\
         .appName("Rexhall Bank ETL")\
         .config("spark.jars", r"C:\Users\DELL\OneDrive\DATA ENGINEERING FOLDER\Rexhall Bank")\
         .getOrCreate()

In [4]:
spark

In [5]:
# Extract the data into a spark dataframe
rexhall_bank_df = spark.read.csv(r"data_set\raw_data\rexhall_bank_transactions.csv", header=True, inferSchema=True)

In [6]:
rexhall_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [7]:
rexhall_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

Data cleaning and Transformation


Data Cleaning

In [8]:
for column in rexhall_bank_df.columns:
    print(column, "Nulls:", rexhall_bank_df.filter(rexhall_bank_df[column].isNull()).count())

Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 100425
Customer_Address Nulls: 100087
Customer_City Nulls: 100034
Customer_State Nulls: 100009
Customer_Country Nulls: 100672
Company Nulls: 100295
Job_Title Nulls: 99924
Email Nulls: 100043
Phone_Number Nulls: 100524
Credit_Card_Number Nulls: 100085
IBAN Nulls: 100300
Currency_Code Nulls: 99342
Random_Number Nulls: 99913
Category Nulls: 100332
Group Nulls: 100209
Is_Active Nulls: 100259
Last_Updated Nulls: 100321
Description Nulls: 100403
Gender Nulls: 99767
Marital_Status Nulls: 99904


In [9]:
# To known the summary statistics of the data
rexhall_bank_df.describe().show()

+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|           Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------------

In [10]:
# Filling the missing values in the data
clean_rexhall_df = rexhall_bank_df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company' : 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Description': 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown'
})

In [11]:
# drop missing values in Last_Updated column
clean_rexhall_df = clean_rexhall_df.na.drop(subset=['Last_Updated'])

In [12]:
# Confirmation
for column in clean_rexhall_df.columns:
    print(column, "Nulls:", clean_rexhall_df.filter(clean_rexhall_df[column].isNull()).count())

Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 0
Customer_Address Nulls: 0
Customer_City Nulls: 0
Customer_State Nulls: 0
Customer_Country Nulls: 0
Company Nulls: 0
Job_Title Nulls: 0
Email Nulls: 0
Phone_Number Nulls: 0
Credit_Card_Number Nulls: 0
IBAN Nulls: 0
Currency_Code Nulls: 0
Random_Number Nulls: 0
Category Nulls: 0
Group Nulls: 0
Is_Active Nulls: 0
Last_Updated Nulls: 0
Description Nulls: 0
Gender Nulls: 0
Marital_Status Nulls: 0


Data Transformation

In [13]:
# Creating Transaction table
transaction = clean_rexhall_df.select('Transaction_Date', 'Amount', 'Transaction_Type') \
                .withColumn('Transaction_ID', monotonically_increasing_id())\
                .select('Transaction_ID', 'Transaction_Date', 'Amount', 'Transaction_Type')
                

In [14]:
transaction.show(5)

+--------------+--------------------+------+----------------+
|Transaction_ID|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [None]:
# Customers table
customer = clean_rexhall_df.select('Customer_Name', 'Customer_Address', 'Customer_State', 'Customer_City', 'Customer_Country')\
            .withColumn('Customer_ID', monotonically_increasing_id())\
            .select('Customer_ID', 'Customer_Name', 'Customer_Address', 'Customer_State', 'Customer_City', 'Customer_Country')

In [16]:
customer.show(5)

+-----------+--------------+--------------------+--------------+------------------+--------------------+
|Customer_ID| Customer_Name|    Customer_Address|Customer_State|     Customer_City|    Customer_Country|
+-----------+--------------+--------------------+--------------+------------------+--------------------+
|          0|    James Neal|54912 Holmes Lodg...|       Florida| West Keithborough|                Togo|
|          1|   Thomas Long| 1133 Collin Passage|   Connecticut|        Joshuabury|Lao People's Demo...|
|          2|Ashley Shelton|5297 Johnson Port...|    New Jersey|       North Maria|              Bhutan|
|          3| James Rosario|56955 Moore Glens...|    New Mexico|North Michellefurt|             Iceland|
|          4|Miguel Leonard|262 Beck Expressw...| West Virginia|           Unknown|             Eritrea|
+-----------+--------------+--------------------+--------------+------------------+--------------------+
only showing top 5 rows



In [17]:
# Employee table
employee = clean_rexhall_df.select('Company', 'Job_Title', 'Email', 'Phone_Number','Gender', 'Marital_Status')\
            .withColumn('Employee_ID', monotonically_increasing_id())\
            .select('Employee_ID','Company', 'Job_Title', 'Email', 'Phone_Number','Gender', 'Marital_Status')

In [18]:
employee.show(5)

+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|Employee_ID|             Company|           Job_Title|               Email|       Phone_Number| Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|             Unknown|  493.720.6609x7545|  Other|      Divorced|
|          1|             Unknown|   Food technologist|michellelynch@exa...|      (497)554-3317| Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|      (534)769-3072|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|parkerjames@examp...|+1-447-900-1320x257|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| zweaver@example.net|            Unknown| Female|       Married|
+-----------+--------------------+------

In [19]:
# Rexford Bank  fact table
fact_table = clean_rexhall_df \
    .join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'inner') \
    .join(customer, ['Customer_Name', 'Customer_Address', 'Customer_State', 'Customer_City', 'Customer_Country'], 'inner') \
    .join(employee, ['Company', 'Job_Title', 'Email', 'Phone_Number', 'Gender', 'Marital_Status'], 'inner') \
    .select('Transaction_ID', 'Customer_ID', 'Employee_ID', 'Credit_Card_Number', 'IBAN', 
            'Currency_Code', 'Random_Number', 'Category', 'Group', 'Is_Active', 
            'Description', 'Last_Updated')

In [20]:
fact_table.show(5)

+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|Transaction_ID|Customer_ID|Employee_ID| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|         Description|        Last_Updated|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   17179921196|17179921196|17179921196|4131053293826613966|GB78CKOS832138470...|          GNF|       7475.0|       B|    Z|      Yes|Health sure story...|2022-02-06 23:35:...|
|         29992|      29992|      29992|   2452924189738024|GB50ZGVC717046003...|          TRY|       9153.0|       D|    Z|       No|Wait represent ge...|2021-11-26 08:47:...|
|   25769840960|25769840960|25769840960|   4625846628264239|GB97NDWP370396356...|          BSD|       6117.0| Unkno

Converting from spark dataframe to pandas dataframe

In [None]:

customer_pd = customer.toPandas()
transaction_pd = transaction.toPandas()
employee_pd = employee.toPandas()
fact_table_pd = fact_table.toPandas()

Saving the data to a csv file

In [54]:
# Writing the data to a csv file
customer_pd.to_csv('data_set/cleaned_data/customer.csv', index=False)
transaction_pd.to_csv('data_set/cleaned_data/transaction.csv', index=False)
employee_pd.to_csv('data_set/cleaned_data/employee.csv', index=False)
fact_table_pd.to_csv('data_set/cleaned_data/fact_table.csv', index=False)

Loading the data into a postgres database

In [None]:
# Importing the necessary libraries
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
import os

In [None]:
# Loading the environment variables
load_dotenv(override=True)
db_user = os.getenv('DB_USER')
db_name = os.getenv('DB_NAME')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_password = os.getenv('DB_PASSWORD')

In [None]:
# Creating the database url
database_url = f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(database_url)

In [59]:
# Loading the data into a postgres database
customer_pd.to_sql('customer', engine, if_exists='replace', index=False)
transaction_pd.to_sql('transaction', engine, if_exists='replace', index=False)
employee_pd.to_sql('employee', engine, if_exists='replace', index=False)
fact_table_pd.to_sql('fact_table', engine, if_exists='replace', index=False)
print("Data loaded successfully")

Data loaded successfully
