In [7]:
#importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2

In [8]:
#set Java home
os.environ['JAVA_HOME'] = r'c:\Program Files\Java\jdk-17'

In [9]:
#Initialize Spark session
spark = SparkSession.builder \
    .appName("Nuga Bank ETL Pipeline") \
        .config("spark.jars", r"c:\Users\David Ibanga\Data Engineering practicals\nuga_bank_etl_case_study\postgresql-42.7.7.jar") \
        .getOrCreate()

In [10]:
spark

In [11]:
#Extract this historical data into a Spark DataFrame
df = spark.read.csv(r'..\dataset\rawdata\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [12]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [13]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [14]:
#Data Cleaning and Transformation
for column in df.columns:
    print(column, 'Nulls: ', df.filter(df[column].isNull()).count())

Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  100425
Customer_Address Nulls:  100087
Customer_City Nulls:  100034
Customer_State Nulls:  100009
Customer_Country Nulls:  100672
Company Nulls:  100295
Job_Title Nulls:  99924
Email Nulls:  100043
Phone_Number Nulls:  100524
Credit_Card_Number Nulls:  100085
IBAN Nulls:  100300
Currency_Code Nulls:  99342
Random_Number Nulls:  99913
Category Nulls:  100332
Group Nulls:  100209
Is_Active Nulls:  100259
Last_Updated Nulls:  100321
Description Nulls:  100403
Gender Nulls:  99767
Marital_Status Nulls:  99904


In [15]:
df.describe().show()

+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|           Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------------

In [16]:
#fill up the missing values
df_clean = df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code':'Unknown',
    'Random_Number': '0.0',
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Last_Updated': 'Unknown',
    'Description': 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown'
})


In [17]:
#Drop missing values in the last_updated column
df_clean=df_clean.dropna(subset=['Last_Updated'])

In [18]:
#checking if data have been cleaned
for column in df_clean.columns:
    print(column, 'Nulls: ', df_clean.filter(df_clean[column].isNull()).count())

Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  0
Customer_Address Nulls:  0
Customer_City Nulls:  0
Customer_State Nulls:  0
Customer_Country Nulls:  0
Company Nulls:  0
Job_Title Nulls:  0
Email Nulls:  0
Phone_Number Nulls:  0
Credit_Card_Number Nulls:  0
IBAN Nulls:  0
Currency_Code Nulls:  0
Random_Number Nulls:  0
Category Nulls:  0
Group Nulls:  0
Is_Active Nulls:  0
Last_Updated Nulls:  0
Description Nulls:  0
Gender Nulls:  0
Marital_Status Nulls:  0


In [19]:
#Data Transformation to 2NF
#Transaction Table
transaction = df_clean.select(
    'Transaction_Date','Amount','Transaction_Type') \
    .withColumn('transaction_id', monotonically_increasing_id()) \
    .select('transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')


In [20]:
transaction.show(5)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [21]:
#customer Table
customer = df_clean.select('Customer_Name', 'Customer_Address','Customer_City', 'Customer_State', 'Customer_Country') \
    .withColumn('customer_id', monotonically_increasing_id()) \
    .select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country')

In [22]:
customer.show(5)

+-----------+--------------+--------------------+------------------+--------------+--------------------+
|customer_id| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
|          0|    James Neal|54912 Holmes Lodg...| West Keithborough|       Florida|                Togo|
|          1|   Thomas Long| 1133 Collin Passage|        Joshuabury|   Connecticut|Lao People's Demo...|
|          2|Ashley Shelton|5297 Johnson Port...|       North Maria|    New Jersey|              Bhutan|
|          3| James Rosario|56955 Moore Glens...|North Michellefurt|    New Mexico|             Iceland|
|          4|Miguel Leonard|262 Beck Expressw...|           Unknown| West Virginia|             Eritrea|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
only showing top 5 rows



In [23]:
#Emploee Table
employee = df_clean.select('Company', 'Job_Title', 'Email', 'Phone_Number', 'Gender', 'Marital_Status') \
    .withColumn('employee_id', monotonically_increasing_id()) \
    .select('employee_id', 'Company', 'Job_Title', 'Email', 'Phone_Number', 'Gender', 'Marital_Status')

In [24]:
employee.show(5)

+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|employee_id|             Company|           Job_Title|               Email|       Phone_Number| Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|             Unknown|  493.720.6609x7545|  Other|      Divorced|
|          1|             Unknown|   Food technologist|michellelynch@exa...|      (497)554-3317| Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|      (534)769-3072|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|parkerjames@examp...|+1-447-900-1320x257|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| zweaver@example.net|            Unknown| Female|       Married|
+-----------+--------------------+------

In [25]:
#Nuga_bank_fact_table
fact_table = df_clean.join(transaction, [ 'Transaction_Date', 'Amount', 'Transaction_Type'], 'inner') \
    .join(customer, ['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country'], 'inner') \
    .join(employee, ['Company', 'Job_Title', 'Email', 'Phone_Number', 'Gender', 'Marital_Status'], 'inner') \
    .select('transaction_id', 'customer_id', 'employee_id','Credit_Card_Number','IBAN', 'Currency_Code', 'Random_Number','Category','Group','Is_Active', 'Last_Updated','Description')
   


In [26]:
fact_table.show(5)

+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   17179921196|17179921196|17179921196|4131053293826613966|GB78CKOS832138470...|          GNF|       7475.0|       B|    Z|      Yes|2022-02-06 23:35:...|Health sure story...|
|         29992|      29992|      29992|   2452924189738024|GB50ZGVC717046003...|          TRY|       9153.0|       D|    Z|       No|2021-11-26 08:47:...|Wait represent ge...|
|   25769840960|25769840960|25769840960|   4625846628264239|GB97NDWP370396356...|          BSD|       6117.0| Unkno

In [27]:
#Data Loading
# Save to PostgreSQL
import psycopg2
import os
from sqlalchemy import create_engine
from dotenv import load_dotenv


# Load environment variables from .env file
load_dotenv()

# PostgreSQL credentials
def get_db_connection():
    try:
        conn = psycopg2.connect(
            host=os.getenv('DB_HOST'),
            port=os.getenv('DB_PORT'),
            database=os.getenv('DB_NAME'),
            user=os.getenv('DB_USERNAME', 'postgres'),  # Changed from USER_NAME to DB_USERNAME
            password=os.getenv('DB_PASSWORD')
        )
        return conn
    except Exception as e:
        print(f"Error connecting to the database: {e}")
        return None

In [28]:
# Connect to sql database
conn = get_db_connection()

In [29]:
#Create a function to create tables
def create_table():
    conn = get_db_connection()
    if conn is None:
        print("Failed to connect to database. Cannot create tables.")
        return False
        
    cursor = conn.cursor()
    create_table_query = '''
                    
                      DROP TABLE IF EXISTS fact_table;
                      DROP TABLE IF EXISTS customer;
                      DROP TABLE IF EXISTS transaction;
                      DROP TABLE IF EXISTS employee;

                   

                    CREATE TABLE customer (
                        customer_id BIGINT PRIMARY KEY,
                        customer_name VARCHAR(10000),
                        customer_address VARCHAR(10000),
                        customer_city VARCHAR(10000),
                        customer_state VARCHAR(10000),
                        customer_country VARCHAR(10000)
                    );

                    CREATE TABLE transaction (
                         transaction_id BIGINT PRIMARY KEY,
                         transaction_date DATE,
                         amount FLOAT,
                         transaction_type VARCHAR(10000)
                        );

                    CREATE TABLE employee (
                        employee_id BIGINT PRIMARY KEY,
                        company VARCHAR(10000),
                        job_title VARCHAR(10000),
                        email VARCHAR(10000),
                        phone_number VARCHAR(10000),
                        gender VARCHAR(10000),
                        marital_status VARCHAR(10000)
                    );
                    CREATE TABLE fact_table (
                        transaction_id BIGINT REFERENCES transaction(transaction_id),
                        customer_id BIGINT REFERENCES customer(customer_id),
                        employee_id BIGINT REFERENCES employee(employee_id),
                        credit_card_number VARCHAR(10000),
                        iban VARCHAR(10000),
                        currency_code VARCHAR(10000),
                        random_number FLOAT,
                        category VARCHAR(10000),
                        "group" VARCHAR(10000),
                        is_active VARCHAR(10000),
                        last_updated TIMESTAMP,
                        description VARCHAR(10000)
                    );
                    '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()
    print("Tables created successfully in PostgreSQL database.")
    return True

In [31]:
if not create_table():
    print("Stopping execution due to table creation failure.")
else:
    print("Tables created successfully, proceeding with data loading...")

In [33]:
from dotenv import load_dotenv
import os

load_dotenv()

url = f"jdbc:postgresql://localhost:5432/{os.getenv('DB_NAME')}"
properties = {
    "user": os.getenv('DB_USERNAME'),
    "password": os.getenv('DB_PASSWORD'),
    "driver": "org.postgresql.Driver"
}

In [34]:
# Write data to PostgreSQL with error handling
try:
    print("Writing customer data...")
    customer.write.jdbc(
        url=url,
        table="customer",
        mode="append",
        properties=properties
    )
    print("Customer data written successfully.")

    print("Writing transaction data...")
    transaction.write.jdbc(
        url=url,
        table='"transaction"',  # quoted because it's a reserved keyword
        mode="append",
        properties=properties
    )
    print("Transaction data written successfully.")

    print("Writing employee data...")
    employee.write.jdbc(
        url=url,
        table="employee",
        mode="append",
        properties=properties
    )
    print("Employee data written successfully.")

    print("Writing fact table data...")
    fact_table.write.jdbc(
        url=url,
        table="fact_table",
        mode="append",
        properties=properties
    )
    print("Fact table data written successfully.")
    print("All data loaded successfully into PostgreSQL tables.")

except Exception as e:
    print(f"Error writing data to PostgreSQL: {e}")
    raise