In [3]:
# Import Necessary Libraries

from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2

In [4]:
# set java home 
os.environ['JAVA_HOME'] = 'C:\java8'


In [6]:
# Initialize my spark session
spark = SparkSession.builder \
        .appName("Nuga Bank ETL") \
        .config("spark.jars", "postgresql-42.7.5.jar") \
        .getOrCreate()


In [7]:
spark

In [11]:
# Extract the historical data into a spark dataframe

df = spark.read.csv(r'dataset\rawdata\nuga_bank_transactions.csv', header = True, inferSchema = True)

In [13]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [15]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [17]:
# Data cleaning and transformation

for column in df.columns:
    print(column, 'Nulls: ', df.filter(df[column].isNull()).count() )

Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  100425
Customer_Address Nulls:  100087
Customer_City Nulls:  100034
Customer_State Nulls:  100009
Customer_Country Nulls:  100672
Company Nulls:  100295
Job_Title Nulls:  99924
Email Nulls:  100043
Phone_Number Nulls:  100524
Credit_Card_Number Nulls:  100085
IBAN Nulls:  100300
Currency_Code Nulls:  99342
Random_Number Nulls:  99913
Category Nulls:  100332
Group Nulls:  100209
Is_Active Nulls:  100259
Last_Updated Nulls:  100321
Description Nulls:  100403
Gender Nulls:  99767
Marital_Status Nulls:  99904


In [18]:
df.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+------+---------+--------------------+------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+------+---------+---------------

In [19]:
# fill Up the missing values

df_clean = df.fillna({
    'Customer_Name' : 'Unknown',
    'Customer_Address': 'Unknown',
     'Customer_City' : 'Unknown', 
     'Customer_State' : 'Unknown',
     'Customer_Country' : 'Unknown',
     'Company' : 'Unknown',
     'Job_Title' : 'Unknown',
     'Email' : 'Unknown', 
     'Phone_Number' : 'Unknown',
     'Credit_Card_Number' : 0,
     'IBAN' : 'Unknown',
     'Currency_Code' : 'Unknown',
     'Random_Number' : 0.0,
     'Category' : 'Unknown',
     'Group' : 'Unknown',
     'Is_Active' : 'Unknown',
     'Last_Updated' : 'Unknown',
     'Description' : 'Unknown',
     'Gender' : 'Unknown',
     'Marital_Status' : 'Unknown'
})

In [20]:
# Drop the missing values in Last_Updated column

df_clean = df_clean.na.drop(subset = ['Last_Updated'])

In [22]:
# checking for nulls after cleaning

for column in df_clean.columns:
    print(column, 'Nulls: ', df_clean.filter(df_clean[column].isNull()).count() )

Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  0
Customer_Address Nulls:  0
Customer_City Nulls:  0
Customer_State Nulls:  0
Customer_Country Nulls:  0
Company Nulls:  0
Job_Title Nulls:  0
Email Nulls:  0
Phone_Number Nulls:  0
Credit_Card_Number Nulls:  0
IBAN Nulls:  0
Currency_Code Nulls:  0
Random_Number Nulls:  0
Category Nulls:  0
Group Nulls:  0
Is_Active Nulls:  0
Last_Updated Nulls:  0
Description Nulls:  0
Gender Nulls:  0
Marital_Status Nulls:  0


In [23]:
df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [None]:
# Data Transformation to 2NF
# Transaction Table

transaction = df_clean.select() \
                        .withColumn('Transaction_ID', monotonically_increasing_id()) \
                        .select('Transaction_ID', 'Transaction_Date', 'Amount', 'Transaction_Type', 'Credit_Card_Number', 'IBAN', 'Currency_Code''Transaction_Date', 'Amount', 'Transaction_Type', 'Credit_Card_Number', 'IBAN', 'Currency_Code')

In [71]:
transaction.show(5)

+--------------+--------------------+------+----------------+------------------+--------------------+-------------+
|Transaction_ID|    Transaction_Date|Amount|Transaction_Type|Credit_Card_Number|                IBAN|Currency_Code|
+--------------+--------------------+------+----------------+------------------+--------------------+-------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|  3592901394693441|GB98RBPP090285271...|          MAD|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|                 0|GB03KFZR339662263...|          VEF|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|      675983949974|GB59QYRN446730519...|          COP|
|             3|2024-04-17 15:29:...|407.15|         Deposit|     4761202519057|GB74FTDO268299438...|          BWP|
|             4|2024-02-10 01:51:...|161.31|         Deposit|   213156729655186|GB94EWRN587847592...|          SOS|
+--------------+--------------------+------+----------------+-----------

In [41]:
#Customer Table

customer = df_clean.select('Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Company', 'Job_Title', 'Email', 'Gender', 'Phone_Number') \
                            .withColumn('Customer_ID', monotonically_increasing_id()) \
                            .select('Customer_ID', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Company', 'Job_Title', 'Email', 'Gender', 'Phone_Number')

In [42]:
customer.show(5)

+-----------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------+-------------------+
|Customer_ID| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email| Gender|       Phone_Number|
+-----------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------+-------------------+
|          0|    James Neal|54912 Holmes Lodg...| West Keithborough|       Florida|                Togo|Benson, Johnson a...|             Unknown|             Unknown|  Other|  493.720.6609x7545|
|          1|   Thomas Long| 1133 Collin Passage|        Joshuabury|   Connecticut|Lao People's Demo...|             Unknown|   Food technologist|michellelynch@exa...| Female|      (497)554-3317|
|          2|Ashley 

In [43]:
# fact table

fact_table = df_clean.join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type', 'Credit_Card_Number', 'IBAN', 'Currency_Code'], 'inner') \
                    .join(customer, ['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Company', 'Job_Title', 'Email', 'Phone_Number'], 'inner') \
                    .select('Transaction_ID', 'Customer_ID', 'Currency_Code', 'Random_Number', 'Category', 'Group', 'Last_Updated', 'Description')

In [30]:
fact_table.show(5)

+--------------+-----------+-------------+-------------+--------+-----+--------------------+--------------------+
|Transaction_ID|Customer_ID|Currency_Code|Random_Number|Category|Group|        Last_Updated|         Description|
+--------------+-----------+-------------+-------------+--------+-----+--------------------+--------------------+
|   42949716102|42949716102|          CHF|       4820.0|       A|    Y|2020-07-02 02:26:...|             Unknown|
|   77309448187|77309448187|          GIP|          0.0|       C|    Z|2021-05-07 01:57:...|They boy approach...|
|   68719547757|68719547757|          GNF|       2411.0|       B|    Y|2022-09-11 14:32:...|Before personal d...|
|   25769855930|25769855930|          CVE|       6611.0|       A|    Y|2023-10-31 02:21:...|             Unknown|
|   34359814090|34359814090|      Unknown|       7029.0|       B|    Z|2022-04-07 12:11:...|She morning price...|
+--------------+-----------+-------------+-------------+--------+-----+-----------------

In [None]:
# Data loading

# Create a function to create tables
def create_table():
    conn = psycopg2.connect(
        host = 'localhost',
        database = 'Nuga_Bank',
        user = 'postgres',
        password = 'Grateful@1'
    )
    cursor = conn.cursor()
    
    # Drop tables if they exist
    cursor.execute("DROP TABLE IF EXISTS customer CASCADE;")
    cursor.execute("DROP TABLE IF EXISTS transaction CASCADE;")
    cursor.execute("DROP TABLE IF EXISTS fact_table CASCADE;")
                           
    # Commit changes before recreating table
    conn.commit()                       
                           
    # Create Customer Table                       
    create_customer_table = '''    
                            CREATE TABLE customer (
                                Customer_ID BIGINT,
                                Customer_Name VARCHAR (10000),
                                Customer_Address VARCHAR (10000), 
                                Customer_City VARCHAR (10000),
                                Customer_State VARCHAR (10000),
                                Customer_Country VARCHAR (10000),
                                Company VARCHAR (10000),
                                Job_Title VARCHAR (10000),
                                Email VARCHAR (1000),
                                Gender VARCHAR (100),
                                Phone_Number VARCHAR (100)
                            );
                        '''
    cursor.execute(create_customer_table)

    
    # Create Transaction Table                        
    create_transaction_table = '''
                                CREATE TABLE "transaction" (
                                Transaction_ID BIGINT,
                                Transaction_Date DATE,
                                Amount FLOAT,
                                Transaction_Type VARCHAR (10000),
                                Credit_Card_Number BIGINT,
                                IBAN VARCHAR (10000),
                                Currency_Code VARCHAR (100)
                            );
                        '''
    cursor.execute(create_transaction_table)

    #Create Fact_Table 
    create_fact_table = '''
                            CREATE TABLE fact_table (
                            Transaction_ID BIGINT,
                            Customer_ID BIGINT,
                            Currency_Code VARCHAR (100),
                            Random_Number FLOAT,
                            Category VARCHAR (1000),
                            "Group" VARCHAR (1000),
                            Last_Updated VARCHAR (10000),
                            Description VARCHAR (10000)
                        );
                    '''
    cursor.execute(create_fact_table)

    conn.commit()
    cursor.close()
    conn.close()
    

In [68]:
create_table()

In [69]:
url = "jdbc:postgresql://localhost:5432/Nuga_Bank"
properties = {
    "user" : "postgres",
    "password" : "Grateful@1",
    "driver": "org.postgresql.Driver"
}

customer.write.jdbc(url = url, table = "customer", mode = "append", properties = properties)

In [73]:
transaction.write.jdbc(url = url, table = "transaction", mode = "append", properties = properties)
fact_table.write.jdbc(url = url, table = "fact_table", mode = "append", properties = properties)