In [79]:
# Installing pyspark on a local machine
# import Necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col
import os
import psycopg2


In [47]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("BanksETL") \
    .master("local[*]") \
    .config("spark.jars", r"C:\Users\user\Desktop\G_pay_bank\postgresql-42.7.7.jar") \
    .getOrCreate()

In [48]:
spark

In [49]:
# Extract  this history data into a spark dataframe
df = spark.read.csv(r"dataset\rawdata\G_pay_bank_transactions.csv", header=True, inferSchema=True)


In [50]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [51]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [52]:
# Data cleaning and transformation
for col in df.columns:
    print(f"Column: {col}, Nulls: {df.filter(df[col].isNull()).count()}")


Column: Transaction_Date, Nulls: 0
Column: Amount, Nulls: 0
Column: Transaction_Type, Nulls: 0
Column: Customer_Name, Nulls: 100425
Column: Customer_Address, Nulls: 100087
Column: Customer_City, Nulls: 100034
Column: Customer_State, Nulls: 100009
Column: Customer_Country, Nulls: 100672
Column: Company, Nulls: 100295
Column: Job_Title, Nulls: 99924
Column: Email, Nulls: 100043
Column: Phone_Number, Nulls: 100524
Column: Credit_Card_Number, Nulls: 100085
Column: IBAN, Nulls: 100300
Column: Currency_Code, Nulls: 99342
Column: Random_Number, Nulls: 99913
Column: Category, Nulls: 100332
Column: Group, Nulls: 100209
Column: Is_Active, Nulls: 100259
Column: Last_Updated, Nulls: 100321
Column: Description, Nulls: 100403
Column: Gender, Nulls: 99767
Column: Marital_Status, Nulls: 99904


In [53]:
df.describe().show()

+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|           Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------------

In [54]:
# Fill up the missing values
df_cleaned = df.fillna({
    "Customer_Name": "Unknown",
    "Customer_Address": "Unknown",
    "Customer_City": "Unknown",
    "Customer_State": "Unknown",
    "Customer_Country": "Unknown",
    "Company": "Unknown",
    "Job_Title": "Unknown",
    "Email": "Unknown",
    "Phone_Number": "Unknown",
    "Credit_Card_Number": 0,
    "IBAN": "Unknown",
    "Currency_Code": "Unknown",
    "Random_Number": 0.0,
    "Category": "Unknown",
    "Group": "Unknown",
    "Is_Active": "Unknown",
    "Description": "No Description",
    "Gender": "Unknown",
    "Marital_Status": "Unknown"
})
    

In [55]:
# drop the missing values in the Last_Updated column
df_cleaned = df_cleaned.dropna(subset=["Last_Updated"])

In [56]:
# Data cleaning and transformation
for col in df_cleaned.columns:
    print(f"Column: {col}, 'Nulls: ', {df_cleaned.filter(df_cleaned[col].isNull()).count()}")


Column: Transaction_Date, 'Nulls: ', 0
Column: Amount, 'Nulls: ', 0
Column: Transaction_Type, 'Nulls: ', 0
Column: Customer_Name, 'Nulls: ', 0
Column: Customer_Address, 'Nulls: ', 0
Column: Customer_City, 'Nulls: ', 0
Column: Customer_State, 'Nulls: ', 0
Column: Customer_Country, 'Nulls: ', 0
Column: Company, 'Nulls: ', 0
Column: Job_Title, 'Nulls: ', 0
Column: Email, 'Nulls: ', 0
Column: Phone_Number, 'Nulls: ', 0
Column: Credit_Card_Number, 'Nulls: ', 0
Column: IBAN, 'Nulls: ', 0
Column: Currency_Code, 'Nulls: ', 0
Column: Random_Number, 'Nulls: ', 0
Column: Category, 'Nulls: ', 0
Column: Group, 'Nulls: ', 0
Column: Is_Active, 'Nulls: ', 0
Column: Last_Updated, 'Nulls: ', 0
Column: Description, 'Nulls: ', 0
Column: Gender, 'Nulls: ', 0
Column: Marital_Status, 'Nulls: ', 0


In [57]:
# Data transformation
# Transaction table
transactions = df_cleaned.select("Transaction_Date","Amount","Transaction_Type")\
                         .withColumn("Transaction_ID", monotonically_increasing_id())\
                         .select("Transaction_ID", "Transaction_Date", "Amount", "Transaction_Type")


In [58]:
transactions.show(5)

+--------------+--------------------+------+----------------+
|Transaction_ID|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows


In [59]:
# customer table
customers = df_cleaned.select( "Customer_Name", "Customer_Address", "Customer_City", 
                               "Customer_State", "Customer_Country")\
                     .withColumn("Customer_ID", monotonically_increasing_id())\
                        .select("Customer_ID", "Customer_Name", "Customer_Address", 
                                "Customer_City", "Customer_State", "Customer_Country")
                     

In [60]:
customers.show(5)

+-----------+--------------+--------------------+------------------+--------------+--------------------+
|Customer_ID| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
|          0|    James Neal|54912 Holmes Lodg...| West Keithborough|       Florida|                Togo|
|          1|   Thomas Long| 1133 Collin Passage|        Joshuabury|   Connecticut|Lao People's Demo...|
|          2|Ashley Shelton|5297 Johnson Port...|       North Maria|    New Jersey|              Bhutan|
|          3| James Rosario|56955 Moore Glens...|North Michellefurt|    New Mexico|             Iceland|
|          4|Miguel Leonard|262 Beck Expressw...|           Unknown| West Virginia|             Eritrea|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
only showing top 5 rows


In [61]:
# employee table
employees = df_cleaned.select("company", "job_title", "email", "phone_number", "Gender", "Marital_Status")\
                        .withColumn("Employee_ID", monotonically_increasing_id())\
                        .select("Employee_ID", "company", "job_title", "email", "phone_number",  "Gender", "Marital_Status")

In [62]:
employees.show(5)

+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|Employee_ID|             company|           job_title|               email|       phone_number| Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|             Unknown|  493.720.6609x7545|  Other|      Divorced|
|          1|             Unknown|   Food technologist|michellelynch@exa...|      (497)554-3317| Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|      (534)769-3072|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|parkerjames@examp...|+1-447-900-1320x257|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| zweaver@example.net|            Unknown| Female|       Married|
+-----------+--------------------+------

In [63]:
# fact table
fact_table = df_cleaned.join(transactions, ["Transaction_Date","Amount","Transaction_Type"], how="inner")\
    .join(customers, [ "Customer_Name", "Customer_Address", "Customer_City", 
                               "Customer_State", "Customer_Country"], how="inner")\
    .join(employees, ["company", "job_title", "email", "phone_number", "Gender", "Marital_Status"] , how="inner")\
    .select('Transaction_ID', 'Customer_ID', 'Employee_ID','Transaction_Date',  'Credit_Card_Number',\
     'IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')
                         

In [80]:
fact_table = fact_table.withColumn(
    'Is_Active', 
    (col('Is_Active') == 'True').cast('boolean')
)

In [64]:
fact_table.show(5)

+--------------+-----------+-----------+--------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|Transaction_ID|Customer_ID|Employee_ID|    Transaction_Date| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+--------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   17179921196|17179921196|17179921196|2024-01-03 05:07:...|4131053293826613966|GB78CKOS832138470...|          GNF|       7475.0|       B|    Z|      Yes|2022-02-06 23:35:...|Health sure story...|
|         29992|      29992|      29992|2024-02-23 16:43:...|   2452924189738024|GB50ZGVC717046003...|          TRY|       9153.0|       D|    Z|       No|2021-11-26 08:47:...|Wait represent ge...|
|   257698

In [65]:
pip install python-dotenv


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [66]:
from dotenv import load_dotenv
import os
load_dotenv()

# Get the password
db_password = os.getenv("DB_PASSWORD")

# Data loading
def get_db_connection():
    conn = psycopg2.connect(
        host="localhost",
        database="g_pay_bank",
        user="postgres",
        password=db_password
    )
    return conn

# connect to the database
conn = get_db_connection()


In [67]:
transactions.printSchema()

root
 |-- Transaction_ID: long (nullable = false)
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)



In [68]:
# create the tables in the database
def create_table():
    cursor = conn.cursor()
    create_table_query = """
        DROP TABLE IF EXISTS fact_table;
        DROP TABLE IF EXISTS transactions;
        DROP TABLE IF EXISTS customers;
        DROP TABLE IF EXISTS employees;

        CREATE TABLE transactions (
            Transaction_ID BIGINT PRIMARY KEY,
            Transaction_Date DATE NOT NULL,
            Amount DECIMAL(18, 4) NOT NULL,
            Transaction_Type VARCHAR(50) NOT NULL
        );

        CREATE TABLE customers (
            Customer_ID BIGINT PRIMARY KEY,
            Customer_Name VARCHAR(1000) NOT NULL,
            Customer_Address VARCHAR(2550) NOT NULL,
            Customer_City VARCHAR(1000) NOT NULL,
            Customer_State VARCHAR(1000) NOT NULL,
            Customer_Country VARCHAR(1000) NOT NULL
        );

        CREATE TABLE employees (
            Employee_ID BIGINT PRIMARY KEY,
            Company VARCHAR(1000) NOT NULL,
            Job_Title VARCHAR(1000) NOT NULL,
            Email VARCHAR(1000) NOT NULL,
            Phone_Number VARCHAR(200) NOT NULL,
            Gender VARCHAR(50) NOT NULL,
            Marital_Status VARCHAR(200) NOT NULL
        );

        CREATE TABLE fact_table (
            Transaction_ID BIGINT REFERENCES transactions(Transaction_ID),
            Customer_ID BIGINT REFERENCES customers(Customer_ID),
            Employee_ID BIGINT REFERENCES employees(Employee_ID),
            Transaction_Date DATE NOT NULL,
            Credit_Card_Number VARCHAR(200) NOT NULL,
            IBAN VARCHAR(34) NOT NULL,
            Currency_Code VARCHAR(100) NOT NULL,
            Random_Number DOUBLE PRECISION NOT NULL,
            Category VARCHAR(1000) NOT NULL,
            "Group" VARCHAR(1000) NOT NULL,
            Is_Active BOOLEAN NOT NULL,
            Last_Updated TIMESTAMP NOT NULL,
            Description TEXT
        );
    """
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()


In [81]:
fact_table.printSchema()

root
 |-- Transaction_ID: long (nullable = false)
 |-- Customer_ID: long (nullable = false)
 |-- Employee_ID: long (nullable = false)
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Credit_Card_Number: long (nullable = false)
 |-- IBAN: string (nullable = false)
 |-- Currency_Code: string (nullable = false)
 |-- Random_Number: double (nullable = false)
 |-- Category: string (nullable = false)
 |-- Group: string (nullable = false)
 |-- Is_Active: boolean (nullable = false)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = false)



In [70]:
create_table()

In [71]:
url = "jdbc:postgresql://localhost:5432/g_pay_bank"
properties = {
    "user": "postgres",
    "password": db_password,
    "driver": "org.postgresql.Driver"
}

# Load data into the database
transactions.write.jdbc(url=url, table="transactions", mode="append", properties=properties)



In [72]:
customers.write.jdbc(url=url, table="customers", mode="append", properties=properties)

In [None]:
employees.write.jdbc(url=url, table="employees", mode="append", properties=properties)

In [82]:
fact_table.write.jdbc(url=url, table="fact_table", mode="append", properties=properties)