In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2

In [3]:
#Initialize spark session

spark = SparkSession.builder \
    .appName("NugaBankETL") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .getOrCreate()

In [4]:
spark

In [5]:
#Extracting the data into a spark frame

nugabank_df = spark.read.csv(r'dataset\rawdata\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [6]:
nugabank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [7]:
nugabank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [8]:
#Data transformation
nugabank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [9]:
#check for missing values

for column in nugabank_df.columns:
    print(column, 'Nulls', nugabank_df.filter(nugabank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [10]:
#Fill up the missing values

nugabank_df_clean = nugabank_df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0,
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Description': 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown'
    
})

In [11]:
#check for missing values after cleaning

for column in nugabank_df.columns:
    print(column, 'Nulls', nugabank_df.filter(nugabank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [12]:
#Drop rows where last updated is nul

nugabank_df_clean = nugabank_df_clean.na.drop(subset=['Last_Updated'])

In [13]:
#check for missing values after cleaning again

for column in nugabank_df.columns:
    print(column, 'Nulls', nugabank_df_clean.filter(nugabank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [14]:
nugabank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [15]:
#Transaction table

transaction = nugabank_df_clean.select('Transaction_Date', 'Amount', 'Transaction_Type')

##Adding the transaction_id column
transaction = transaction.withColumn('Transaction_id', monotonically_increasing_id())

#Re-ordering the columns
transaction = transaction.select('Transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')

In [16]:
transaction.show(5)

+--------------+--------------------+------+----------------+
|Transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [17]:
# Customer table

customer = nugabank_df_clean.select( 'Customer_Name','Customer_Address','Customer_City',\
                                    'Customer_State','Customer_Country','Email','Phone_Number').distinct()

#add the id column
customer = customer.withColumn('Customer_id', monotonically_increasing_id())

#reorder the table
customer = customer.select('Customer_id','Customer_Name','Customer_Address','Customer_City',\
                                    'Customer_State','Customer_Country','Email','Phone_Number' )

In [18]:
customer.show(5)

+-----------+--------------+--------------------+-------------+--------------+----------------+--------------------+--------------------+
|Customer_id| Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|               Email|        Phone_Number|
+-----------+--------------+--------------------+-------------+--------------+----------------+--------------------+--------------------+
|          0|Miguel Leonard|262 Beck Expressw...|      Unknown| West Virginia|         Eritrea| zweaver@example.net|             Unknown|
|          1|       Unknown|             Unknown|  Evanchester|        Oregon|         Uruguay|             Unknown| (384)778-9942x91236|
|          2|Michael Murphy|894 Williams Ridg...|Dominguezview|      New York|          Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|Tina Gutierrez|    415 Taylor Knoll|    Donnastad|South Carolina|         Unknown|sarabrooks@exampl...|  623-933-0431x87174|
|          4|  Kylie Adkins|    43

In [19]:
#Employee table

Employee = nugabank_df_clean.select('Company','Job_Title','Gender','Marital_Status').distinct()

#add id column
Employee = Employee.withColumn('Employee_id', monotonically_increasing_id())

#re-order the column
Employee = Employee.select('Employee_id','Company','Job_Title','Gender','Marital_Status')

In [20]:
Employee.show(5)

+-----------+--------------------+--------------------+-------+--------------+
|Employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|         Price Group|             Unknown|   Male|        Single|
|          1|Rhodes, King and ...| Trade mark attorney|   Male|       Unknown|
|          2|Schmidt, Morgan a...|     Engineer, water| Female|        Single|
|          3|       Johnson Group|  Forensic scientist|   Male|       Unknown|
|          4|     Phillips-Prince|Production assist...|Unknown|        Single|
+-----------+--------------------+--------------------+-------+--------------+
only showing top 5 rows



In [21]:
#Building the fact table using join

fact_table = nugabank_df_clean.join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'inner')\
                              .join(customer, ['Customer_Name','Customer_Address','Customer_City',\
                                    'Customer_State','Customer_Country','Email','Phone_Number'], 'inner')\
                              .join(Employee, ['Company','Job_Title','Gender','Marital_Status'], 'inner')\
                              .select('Transaction_id', 'Customer_id', 'Employee_id', 'Credit_Card_Number','IBAN',\
                                     'Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated',\
                                      'Description' )

In [22]:
fact_table.show(5)

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|Transaction_id|Customer_id|Employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|    8590072254| 8589986479|      62959|  3518317232148905|GB65HEYL937620470...|          RSD|       5419.0|       D|    X|       No|2023-11-23 03:03:...|Enjoy interview c...|
|        175861|17179901944|        818|      501840097544|GB38LHXK416865631...|          KZT|       5100.0| Unknown|    X|      Yes|2023-05-25 23:02:...|             Unknown|
|   17180060193|      92099|     129561|  4147377235704853|             Unknown|          FKP|       8434.0|       B|   

In [23]:
#Data Loading

def get_db_connection():
    connection = psycopg2.connect(
        host = 'localhost',
        database = 'nuga_bank',
        user = 'postgres',
        password = '#Tolexy5038'
    )
    return connection



In [24]:
#create a function to create tables
def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = ''' 
                        DROP TABLE IF EXISTS customer;
                        DROP TABLE IF EXISTS transaction;
                        DROP TABLE IF EXISTS employee;
                        DROP TABLE IF EXISTS fact_table;

                        CREATE TABLE customer(
                            Customer_id BIGINT,
                            Customer_Name VARCHAR(100000),
                            Customer_Address VARCHAR(100000),
                            Customer_City VARCHAR(100000),
                            Customer_State VARCHAR(100000),
                            Customer_Country VARCHAR(100000),
                            Email VARCHAR(100000),
                            Phone_Number VARCHAR(100000)
                        );

                        CREATE TABLE transaction(
                            Transaction_id BIGINT,
                            Transaction_Date DATE,
                            Amount FLOAT,
                            Transaction_Type VARCHAR(100000)
                        );

                        CREATE TABLE employee(
                            Employee_id BIGINT,
                            Company VARCHAR(100000),
                            Job_Title VARCHAR(100000),
                            Gender VARCHAR(100000),
                            Marital_Status VARCHAR(100000)
                        );

                        CREATE TABLE fact_table(
                            Transaction_id BIGINT,
                            Customer_id BIGINT,
                            Employee_id BIGINT,
                            Credit_Card_Number VARCHAR(100000),
                            IBAN VARCHAR(100000),
                            Currency_Code VARCHAR(100000),
                            Random_Number FLOAT,
                            Category VARCHAR(100000),
                            Group_name VARCHAR(100000),
                            Is_Active VARCHAR(100000),
                            Last_Updated DATE,
                            Description VARCHAR(100000)
                        );
    
                         '''
    
    cursor.execute(create_table_query)

    conn.commit()
    cursor.close()
    conn.close()
    

In [25]:
create_table()

In [26]:
fact_table = fact_table.withColumnRenamed("Group", "Group_Name")

In [27]:
url = "jdbc:postgresql://localhost:5432/nuga_bank"
properties = {
    "user": "postgres",
    "password": "#Tolexy5038",
    "driver" : "org.postgresql.Driver"
}

customer.write.jdbc(url=url, table="customer", mode="append", properties=properties)

In [28]:
transaction.write.jdbc(url=url, table="transaction", mode="append", properties=properties)
Employee.write.jdbc(url=url, table="Employee", mode="append", properties=properties)
fact_table.write.jdbc(url=url, table="fact_table", mode="append", properties=properties)
