### Import Necessary dependencies


In [44]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine
from pyspark.sql.functions import monotonically_increasing_id


In [2]:
# initialize Spark Session
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [3]:
spark 

## Data Extraction

In [20]:
nuga_bank_df = spark.read.csv(r'C:\Users\David Ibanga\Data Engineering practicals\Nuga_bank\nuga_bank_spark_etl\dataset\nuga_bank_transactions.csv', header = True, inferSchema=True)

In [15]:
nuga_bank_df.show()

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------

In [16]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

## Data Cleaning And Transformation

In [21]:
nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [None]:
# Number of rows 
num_rows = nuga_bank_df.count()
print(f"Number of rows: {num_rows}")

Number of rows: 1000000


In [23]:
#number of columns
num_columns = len(nuga_bank_df.columns)
print(f"Number of columns: {num_columns}")

Number of columns: 23


In [28]:
#checking for null values
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [34]:
# How to fill up missing values
nuga_bank_df_clean = nuga_bank_df.fillna({
    'Customer_Name':'Unknown',
    'Customer_Address':'Unknown',
    'Customer_City':'Unknown',
    'Customer_State':'Unknown',
    'Customer_Country':'Unknown',
    'Company':'Unknown',
    'Job_Title':'Unknown',
    'Email':'Unknown',
    'Phone_Number':'Unknown',
    'Credit_Card_Number':'0',
    'IBAN':'Unknown',
    'Currency_Code':'Unknown',
    'Random_Number':'0.0',
    'Category':'Unknown',
    'Group':'Unknown',
    'Is_Active':'Unknown',
    'Description':'Unknown',
    'Gender':'Unknown',
    'Marital_Status':'Unknown',
})

In [37]:
#Drop rows where last_updated is null
nuga_bank_df_clean = nuga_bank_df_clean.dropna(subset=['Last_Updated'])

In [38]:
#checking for null values
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [39]:
#no of rows
num_rows = nuga_bank_df_clean.count()
print(f"Number of rows after cleaning: {num_rows}")

Number of rows after cleaning: 899679


In [40]:
# To view the summary statistics of the data
nuga_bank_df_clean.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+----

In [41]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [42]:
# Transaction Table
transaction =nuga_bank_df_clean.select(
    'Transaction_Date', 'Amount','Transaction_Type'
)

In [45]:
# Adding the transactions ID column
transaction = transaction.withColumn('transaction_id', monotonically_increasing_id())

In [50]:
#Reordering the columns
transaction = transaction.select('transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')

In [51]:
transaction.show()

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
|             5|2024-02-10 22:56:...|764.34|        Transfer|
|             6|2024-04-07 00:07:...|734.59|         Deposit|
|             7|2024-03-08 01:51:...|592.43|         Deposit|
|             8|2024-02-01 12:34:...| 927.1|         Deposit|
|             9|2024-03-22 16:46:...| 66.59|        Transfer|
|            10|2024-04-23 13:30:...| 246.3|      Withdrawal|
|            11|2024-01-13 01:22:...|782.32|      Withdrawal|
|            12|2024-02-25 15:16:...|818.42|      Withdrawal|
|       

In [52]:
# Customer Table
customer = nuga_bank_df_clean.select(
'Customer_Name', 'Customer_Address', 'Customer_City',
    'Customer_State', 'Customer_Country', 'Email',
    'Phone_Number'
)

In [57]:
#Add Id column
customer = customer.withColumn('customer_id', monotonically_increasing_id()).distinct()

#reorder the table
customer = customer.select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City',
    'Customer_State', 'Customer_Country', 'Email', 'Phone_Number')

In [59]:
customer.count()

899679

In [58]:
customer.show()

+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|customer_id|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|        Phone_Number|
+-----------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|        176|Jonathan Williams|     429 Allen Shore|           Jamesside|  South Dakota|             Unknown|martinbrown@examp...|             Unknown|
|        310|     Jessica Dunn|70041 Tapia Vista...|          New Amanda|       Wyoming|              Angola|   hmora@example.com|001-713-202-8704x219|
|        639|     Jose Terrell| 152 Collins Landing|   North Charlesport|       Unknown|            Maldives|victoriamoore@exa...|    001-679-739-9215|
|       1009|     Holly Fisher|          872 Le Rue|      Lake Alexander|      Nebraska|

In [60]:
#Employee Table
employee = nuga_bank_df_clean.select(
'Company','Job_Title', 'Gender', 'Marital_Status'
)

#Add Id column
employee = employee.withColumn('employee_id', monotonically_increasing_id()).distinct()

#reorder the table
employee = employee.select('employee_id', 'Company','Job_Title', 'Gender', 'Marital_Status')

In [61]:
employee.show()

+-----------+--------------------+--------------------+-------+--------------+
|employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|        260|Thomas, Graham an...|Civil Service fas...|   Male|       Married|
|        313|Booth, Cook and B...|             Unknown|Unknown|        Single|
|        553|             Unknown|Primary school te...|   Male|        Single|
|        665|Novak, Baker and Lee|         Media buyer|  Other|       Unknown|
|        839|       Henderson LLC|Civil engineer, c...| Female|      Divorced|
|       1009|Schultz, Contrera...|   Market researcher| Female|       Married|
|       1146|             Unknown|    Financial trader|   Male|        Single|
|       1184|             Unknown|  Furniture designer| Female|        Single|
|       1356|      Collins-Powell|       Social worker| Female|      Divorced|
|       1430|        Ramsey Group|Scientific labora.

In [62]:
# Fact Table
fact_table =nuga_bank_df_clean.join(customer, ['Customer_Name', 'Customer_Address', 'Customer_City',
    'Customer_State', 'Customer_Country', 'Email', 'Phone_Number'], 'left') \
        .join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'left') \
        .join(employee, ['Company','Job_Title', 'Gender', 'Marital_Status'], 'left') \
            .select('transaction_id', 'customer_id', 'employee_id', 'Transaction_Date', \
 'Credit_Card_Number', 'IBAN', 'Currency_Code', 'Random_Number', 'Category', 'Group', \
 'Is_Active', 'Last_Updated', 'Description',)


In [63]:
fact_table.show()

+--------------+-----------+-----------+--------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|    Transaction_Date|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+--------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|    8589934597| 8589934597| 8589934597|2024-03-12 14:08:...|      630428157006|GB86GLHT381589496...|      Unknown|       5000.0|       A|    Z|       No|2023-10-14 00:47:...|Everything decade...|
|   25769803791|25769803791|25769803791|2024-01-08 00:20:...|    38082745081301|             Unknown|          DJF|          0.0|       A|    Z|      Yes|2021-05-24 05:28:...|Into because end....|
|   17179869188