Import dependencies


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, lit
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType
from pyspark.sql.functions import monotonically_increasing_id
!pip install psycopg2-binary
from sqlalchemy import create_engine



In [5]:
# initialize Spark session
spark = SparkSession.builder.appName("NugaBanksETL").getOrCreate()

In [6]:
spark

Data Extraction

In [7]:
nuga_banks_df = spark.read.csv(r'C:\Users\user\Desktop\nuga_bank_case_study\dataset\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [8]:
nuga_banks_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [9]:
nuga_banks_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [10]:
# Data transformation

nuga_banks_df.columns


['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [11]:
# no of rows and columns
num_rows = nuga_banks_df.count()
num_cols = len(nuga_banks_df.columns)

num_rows, num_cols

(1000000, 23)

In [12]:
# checking for null values
for col in nuga_banks_df.columns:
    print(col, 'Nulls', nuga_banks_df.filter(nuga_banks_df[col].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [13]:
# fill up null values
nuga_banks_df_cleaned = nuga_banks_df.fillna({
    'Customer_Name': 'unknown',
    'Customer_Address': 'unknown',
    'Customer_State': 'unknown',
    'Customer_City': 'unknown',
    'Customer_Country': 'unknown',
    'Company': 'unknown',
    'Job_Title': 'unknown',
    'Email': 'unknown',
    'Phone_Number': 'unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'unknown',
    'Currency_Code': 'unknown',
    'Random_Number': 0.0,
    'Category': 'unknown',
    'Group': 'unknown',
    'Is_Active': 'unknown',
    'Description': 'unknown',
    'Gender': 'unknown',
    'Marital_Status': 'unknown'
    
})

In [14]:
# checking for null values
for col in nuga_banks_df_cleaned.columns:
    print(col, 'Nulls', nuga_banks_df_cleaned.filter(nuga_banks_df_cleaned[col].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [15]:
# drop null values left
nuga_banks_df_cleaned = nuga_banks_df_cleaned.dropna(subset=['Last_Updated'])

In [16]:
# checking for null values
for col in nuga_banks_df_cleaned.columns:
    print(col, 'Nulls', nuga_banks_df_cleaned.filter(nuga_banks_df_cleaned[col].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [17]:
# no of rows and columns
num_rows = nuga_banks_df_cleaned.count()
num_cols = len(nuga_banks_df_cleaned.columns)

num_rows, num_cols

(899679, 23)

In [18]:
# To view the summary statistics of the DataFrame
nuga_banks_df_cleaned.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|     Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+-------

In [19]:
nuga_banks_df_cleaned.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [20]:
# Transaction table
transaction = nuga_banks_df_cleaned.select('Transaction_Date', 'Transaction_Type', 'Amount')
transaction.show()

+--------------------+----------------+------+
|    Transaction_Date|Transaction_Type|Amount|
+--------------------+----------------+------+
|2024-03-23 15:38:...|      Withdrawal| 34.76|
|2024-04-22 19:15:...|      Withdrawal|163.92|
|2024-04-12 19:46:...|      Withdrawal|386.32|
|2024-04-17 15:29:...|         Deposit|407.15|
|2024-02-10 01:51:...|         Deposit|161.31|
|2024-02-10 22:56:...|        Transfer|764.34|
|2024-04-07 00:07:...|         Deposit|734.59|
|2024-03-08 01:51:...|         Deposit|592.43|
|2024-02-01 12:34:...|         Deposit| 927.1|
|2024-03-22 16:46:...|        Transfer| 66.59|
|2024-04-23 13:30:...|      Withdrawal| 246.3|
|2024-01-13 01:22:...|      Withdrawal|782.32|
|2024-02-25 15:16:...|      Withdrawal|818.42|
|2024-01-01 20:55:...|      Withdrawal|352.23|
|2024-01-19 00:01:...|      Withdrawal|316.19|
|2024-04-09 14:40:...|      Withdrawal|662.26|
|2024-04-15 04:58:...|         Deposit|893.73|
|2024-04-12 14:32:...|      Withdrawal|746.22|
|2024-02-26 1

In [21]:
# Adding the 'Transaction_ID' column
transaction = transaction.withColumn("Transaction_id", monotonically_increasing_id())

In [22]:
transaction.show()

+--------------------+----------------+------+--------------+
|    Transaction_Date|Transaction_Type|Amount|Transaction_id|
+--------------------+----------------+------+--------------+
|2024-03-23 15:38:...|      Withdrawal| 34.76|             0|
|2024-04-22 19:15:...|      Withdrawal|163.92|             1|
|2024-04-12 19:46:...|      Withdrawal|386.32|             2|
|2024-04-17 15:29:...|         Deposit|407.15|             3|
|2024-02-10 01:51:...|         Deposit|161.31|             4|
|2024-02-10 22:56:...|        Transfer|764.34|             5|
|2024-04-07 00:07:...|         Deposit|734.59|             6|
|2024-03-08 01:51:...|         Deposit|592.43|             7|
|2024-02-01 12:34:...|         Deposit| 927.1|             8|
|2024-03-22 16:46:...|        Transfer| 66.59|             9|
|2024-04-23 13:30:...|      Withdrawal| 246.3|            10|
|2024-01-13 01:22:...|      Withdrawal|782.32|            11|
|2024-02-25 15:16:...|      Withdrawal|818.42|            12|
|2024-01

In [23]:
# Reordering the columns
transaction = transaction.select("Transaction_id", "Transaction_Date", "Amount", "Transaction_Type")
transaction.show()

+--------------+--------------------+------+----------------+
|Transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
|             5|2024-02-10 22:56:...|764.34|        Transfer|
|             6|2024-04-07 00:07:...|734.59|         Deposit|
|             7|2024-03-08 01:51:...|592.43|         Deposit|
|             8|2024-02-01 12:34:...| 927.1|         Deposit|
|             9|2024-03-22 16:46:...| 66.59|        Transfer|
|            10|2024-04-23 13:30:...| 246.3|      Withdrawal|
|            11|2024-01-13 01:22:...|782.32|      Withdrawal|
|            12|2024-02-25 15:16:...|818.42|      Withdrawal|
|       

In [24]:
# Customer table
customer = nuga_banks_df_cleaned.select('Customer_Name', 'Customer_Address', 'Customer_State', 'Customer_City', 'Customer_Country', 'Email', 'Phone_Number').distinct()
customer = customer.withColumn("Customer_id", monotonically_increasing_id())
customer = customer.select("Customer_id", "Customer_Name", "Customer_Address", "Customer_State", "Customer_City", "Customer_Country", "Email", "Phone_Number")
customer.show()

+-----------+------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|Customer_id|     Customer_Name|    Customer_Address|Customer_State|       Customer_City|    Customer_Country|               Email|        Phone_Number|
+-----------+------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|          0|    Allen Castillo|   5750 Vanessa Neck|North Carolina|     New Vickiemouth|              Zambia|             unknown|  732.974.7438x89666|
|          1|        Tina Jones|28150 Kelsey Stat...|          Iowa|             unknown|               Qatar|gabriellemoore@ex...|    736.645.3977x275|
|          2|    Michael Murphy|894 Williams Ridg...|      New York|       Dominguezview|              Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|       Brian Glenn|505 Mcdowell Gard...|  South Dakota|South Christin

In [25]:
# Employee table
employee = nuga_banks_df_cleaned.select( 'Company', 'Job_Title', 'Gender', 'Marital_Status').distinct()
employee = employee.withColumn("Employee_id", monotonically_increasing_id())
employee = employee.select("Employee_id", "Company", "Job_Title", "Gender", "Marital_Status")
employee.show()


+-----------+--------------------+--------------------+-------+--------------+
|Employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|Hill, Sharp and F...|             unknown| Female|      Divorced|
|          1|Schmidt, Morgan a...|     Engineer, water| Female|        Single|
|          2|Medina, Flores an...|             Curator| Female|       unknown|
|          3|         Bell-Murphy|Geographical info...|   Male|       unknown|
|          4|      Henry and Sons|Engineer, civil (...| Female|       Married|
|          5|             unknown|   Company secretary|unknown|      Divorced|
|          6|             unknown|Clinical embryolo...|   Male|        Single|
|          7|Carrillo, Schwart...| Solicitor, Scotland| Female|        Single|
|          8|         Olson-Lucas| Magazine journalist|   Male|      Divorced|
|          9|      Suarez-Terrell|            Best b

In [26]:
#Fact table
# Joining the tables to create the fact table
fact_table= nuga_banks_df_cleaned.join(transaction, ["Transaction_Date", "Amount", "Transaction_Type"], how='left') \
                                 .join(customer, ["Customer_Name", "Customer_Address", "Customer_State", "Customer_City", "Customer_Country", "Email", "Phone_Number"], how='left') \
                                 .join(employee, ["Company", "Job_Title", "Gender", "Marital_Status"], how='left')\
                                 .select("Transaction_id", "Customer_id", "Employee_id", "Credit_Card_Number", "IBAN", "Currency_Code", "Random_Number", "Category", "Group", "Is_Active", "Last_Updated", "Description")
fact_table.show()

+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|Transaction_id|Customer_id|Employee_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|    8589934609|34359760117|      16975|4191948926153525351|GB33SCRV040476482...|          MRO|       4711.0|       C|unknown|       No|2023-03-30 22:09:...|Program dinner me...|
|   34359738375|34359808817|      58544|    180029146062238|GB17MTZS784426277...|          GIP|       5688.0|       A|unknown|  unknown|2021-02-23 05:57:...|Network quite sit...|
|   42949672961|      79314|      66570|   4596307291033331|GB59TAVO484072449...|          ERN|       713

In [27]:
# output the transformed data to parquet
#transaction.write.mode('overwrite').parquet(r'dataset/transaction')
#customer.write.mode('overwrite').parquet(r'dataset/customer')
#employee.write.mode('overwrite').parquet(r'dataset/employee')
#fact_table.write.mode('overwrite').parquet(r'dataset/fact_table')

In [28]:
# output the transformed data as csv
#transaction.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/transaction')
#customer.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/customer')
#employee.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/employee')
#fact_table.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/fact_table')

In [29]:
# Convert spark df to pandas df
transaction_pd_df = transaction.toPandas()
customer_pd_df = customer.toPandas()
employee_pd_df = employee.toPandas()
fact_table_pd_df = fact_table.toPandas()

In [30]:
!pip install python-dotenv




In [35]:
from dotenv import load_dotenv
import os

# Load environment variables from the .env file
load_dotenv()

# Assign values from .env to db_params dictionary
db_params = {
    'username': os.getenv('DB_USERNAME'),
    'password': os.getenv('DB_PASSWORD'),
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
    'database': os.getenv('DB_NAME')
}




In [34]:

# define the database connection url with db parameters
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

# Create the database engine with the  db url
engine = create_engine(db_url)

# Connect to PostgreSQL server
with engine.connect() as connection:
    # Create tables and load the data
    transaction_pd_df.to_sql('transaction', connection, index=False, if_exists='replace')
    customer_pd_df.to_sql('customer', connection, index=False, if_exists='replace')
    employee_pd_df.to_sql('employee', connection, index=False, if_exists='replace')
    fact_table_pd_df.to_sql('fact_table', connection, index=False, if_exists='replace')

print('Database, tables and data loaded successfully ')

Database, tables and data loaded successfully 
