### remember github cannot commit files larger than 100mb so always create and  use the git ignore  format.

### Install or Import necessary libraries

In [1]:
# Install or Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id 
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import sqlite3
import psycopg2
from dotenv import load_dotenv
import os

In [2]:
## set jave home to avoid java running with the previous version
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk-1.8'

In [3]:
# initialize my spark seesion
spark = SparkSession.builder\
        .appName("Globus bank Etl ")\
        .config("spark.jars", "postgresql-42.7.4.jar")\
        .getOrCreate()

In [4]:
spark

In [5]:
# Extract this historical data into spark dataframe
df = spark.read.csv(r'dataset\rawdata\Globus_bank_transactions.csv', header=True, inferSchema=True)

In [6]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [7]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

### Data cleaning and Transformation

In [8]:
# Data cleaning and Transformation(to check missing values and nulls in the column)
for column in df.columns:
    print(column,'Nulls:', df.filter(df[column].isNull()).count())


Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 100425
Customer_Address Nulls: 100087
Customer_City Nulls: 100034
Customer_State Nulls: 100009
Customer_Country Nulls: 100672
Company Nulls: 100295
Job_Title Nulls: 99924
Email Nulls: 100043
Phone_Number Nulls: 100524
Credit_Card_Number Nulls: 100085
IBAN Nulls: 100300
Currency_Code Nulls: 99342
Random_Number Nulls: 99913
Category Nulls: 100332
Group Nulls: 100209
Is_Active Nulls: 100259
Last_Updated Nulls: 100321
Description Nulls: 100403
Gender Nulls: 99767
Marital_Status Nulls: 99904


In [9]:
df_clean = df.fillna({
    'Customer_Name': 'unknown',
    'Customer_Address': 'unknown',
    'Customer_City': 'unknown',
    'Customer_State':'unknown',
    'Customer_Country': 'unknown',
    'Company': 'unknown',
    'Job_Title': 'unknown',
    'Email': 'unknown',
    'Phone_Number': 'unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'unknown',
    'Currency_Code': 'unknown',
    'Random_Number': 0.0,
    'Category': 'unknown',
    'Group': 'unknown',
    'Is_Active': 'unknown',
    'Description': 'unknown',
    'Gender': 'unknown',
    'Marital_Status': 'unknown'
})

In [10]:
df_clean = df_clean.na.drop(subset=['Last_Updated'])

In [11]:
# confirm that you have sorted missing values
for column in df_clean.columns:
    print(column,'Nulls:', df_clean.filter(df_clean[column].isNull()).count())

Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 0
Customer_Address Nulls: 0
Customer_City Nulls: 0
Customer_State Nulls: 0
Customer_Country Nulls: 0
Company Nulls: 0
Job_Title Nulls: 0
Email Nulls: 0
Phone_Number Nulls: 0
Credit_Card_Number Nulls: 0
IBAN Nulls: 0
Currency_Code Nulls: 0
Random_Number Nulls: 0
Category Nulls: 0
Group Nulls: 0
Is_Active Nulls: 0
Last_Updated Nulls: 0
Description Nulls: 0
Gender Nulls: 0
Marital_Status Nulls: 0


### Transformation to 2NF

In [12]:
# Transformation to 2NF
# Transactaction table
Transaction = df_clean.select('Transaction_Date', 'Amount', 'Transaction_Type')\
                      .withColumn('Transaction_ID', monotonically_increasing_id())\
                      .select('Transaction_ID','Transaction_Date', 'Amount', 'Transaction_Type')

Transaction.show(5)

+--------------+--------------------+------+----------------+
|Transaction_ID|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [13]:
# Customer Table
Customer = df_clean.select('Customer_Name','Customer_Address','Customer_City',
                                        'Customer_State','Customer_Country').distinct()\
                   .withColumn('Customer_ID', monotonically_increasing_id())\
                   .select('Customer_ID','Customer_Name','Customer_Address','Customer_City',
                                        'Customer_State','Customer_Country')

Customer.show(5)

+-----------+----------------+--------------------+-------------+--------------+--------------------+
|Customer_ID|   Customer_Name|    Customer_Address|Customer_City|Customer_State|    Customer_Country|
+-----------+----------------+--------------------+-------------+--------------+--------------------+
|          0|     Jamie Dixon|0146 Veronica Mou...|    Jonesland|      Delaware|        Saint Martin|
|          1|     Vicki James|7987 Carr Bridge ...|      unknown| West Virginia|         Puerto Rico|
|          2|Jennifer Kennedy|     388 Susan Forks|   East Jason|          Ohio|United States of ...|
|          3|     Susan Green|40609 Amber Junct...|     Markfort|      Colorado|       Liechtenstein|
|          4| Elizabeth Hicks|             unknown|      unknown|         Maine|          Micronesia|
+-----------+----------------+--------------------+-------------+--------------+--------------------+
only showing top 5 rows



In [14]:
# Employee table
Employee = df_clean.select('Company','Job_Title','Email','Phone_Number','Gender','Marital_Status').distinct()\
                   .withColumn('Employee_ID', monotonically_increasing_id())\
                   .select('Employee_ID','Company','Job_Title','Email','Phone_Number','Gender','Marital_Status')

Employee.show(5)

+-----------+--------------+--------------------+--------------------+--------------------+-------+--------------+
|Employee_ID|       Company|           Job_Title|               Email|        Phone_Number| Gender|Marital_Status|
+-----------+--------------+--------------------+--------------------+--------------------+-------+--------------+
|          0|Guerra-Elliott|Loss adjuster, ch...|andrewthompson@ex...| +1-549-979-2325x588|  Other|        Single|
|          1|  Kaiser-White|Scientist, clinic...|stephanierobinson...|001-741-216-5412x...|  Other|      Divorced|
|          2|       unknown|    Industrial buyer|sandersjessica@ex...|        577-515-7326|unknown|        Single|
|          3|Salas-Gonzalez|Scientist, biomed...|             unknown|        623-694-0978|unknown|       unknown|
|          4|       unknown|Housing manager/o...|wilkinsontravis@e...|    293.809.0487x561|   Male|      Divorced|
+-----------+--------------+--------------------+--------------------+----------

In [15]:
Fact_table = df.join(Customer, ['Customer_Name','Customer_Address','Customer_City',\
                                        'Customer_State','Customer_Country'], 'inner')\
                         .join(Transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'],'inner')\
                         .join(Employee, ['Company','Job_Title','Email','Phone_Number','Gender','Marital_Status'], 'inner')\
                         .select('Transaction_ID','Customer_ID','Employee_ID','Credit_Card_Number','IBAN',\
                                 'Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')   


Fact_table.show(5)


+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|Transaction_ID|Customer_ID|Employee_ID|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|    8590082843| 8589994529|     110291|   213171061574994|GB18FLXT166730972...|          TWD|         NULL|       D|    Z|      Yes|2022-02-02 15:27:...|House national us...|
|         23786|25769842739|      37977|  6582679897553149|GB50WGDZ755811852...|          KPW|       9130.0|       A|    X|      Yes|2023-11-08 21:48:...|Everyone recogniz...|
|    8589993294|      81237|      89620|  2680275351048591|                NULL|          ZAR|       8607.0|       C|   

### DATA LOADING

In [16]:
# DATA LOADING
load_dotenv()

def get_db_connection():
    connection = psycopg2.connect(
        host=os.getenv("DB_HOST"),
        database=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD")
    )
    return connection

# Establish connection
conn = get_db_connection()

In [17]:
#conn = sqlite3.connect('Globus_bank')

In [18]:
# create table schema in postgresql

def create_table():
    cursor = conn.cursor()

    queries = '''
        DROP TABLE IF EXISTS Customer;
        DROP TABLE IF EXISTS Transaction;
        DROP TABLE IF EXISTS Employee;
        DROP TABLE IF EXISTS Fact_table;

        CREATE TABLE Customer (
            Customer_ID BIGINT,
            Customer_Name VARCHAR(1000),
            Customer_Address VARCHAR(1000),
            Customer_City VARCHAR(1000),
            Customer_State VARCHAR(1000),
            Customer_Country VARCHAR(1000)
        );

        CREATE TABLE Transaction (
            Transaction_ID BIGINT,
            Transaction_Type VARCHAR(1000),
            Amount FLOAT,
            Transaction_Date DATE
        );

        CREATE TABLE Employee (
            Employee_ID BIGINT,
            Company VARCHAR(1000),
            Job_Title VARCHAR(1000),
            Email VARCHAR(1000),
            Phone_Number VARCHAR(1000),
            Gender VARCHAR(1000),
            Marital_Status VARCHAR(1000)
        );

        CREATE TABLE Fact_table (
            Customer_ID BIGINT,
            Transaction_ID BIGINT,
            Employee_ID BIGINT,
            Credit_Card_Number BIGINT,
            IBAN VARCHAR(1000),
            Currency_Code VARCHAR(1000),
            Random_Number FLOAT,
            Category VARCHAR(1000),
            "Group" VARCHAR(1000),
            Is_Active VARCHAR(1000),
            Last_Updated DATE,
            Description VARCHAR(1000)
        );
    '''

    # Split the queries by semicolon and execute each one
    for query in queries.split(';'):
        query = query.strip()
        if query:  # avoid executing empty strings
            cursor.execute(query)

    conn.commit()
    cursor.close()
    conn.close()

In [19]:
create_table()

In [20]:
# Load environment variables from .env file
load_dotenv()

# Retrieve credentials from environment variables
url = os.getenv("DB_URL")
properties = { 
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "driver": os.getenv("DB_DRIVER")
}

# Writing data to tables
Customer.write.jdbc(url=url, table="Customer", mode="append", properties=properties)
Transaction.write.jdbc(url=url, table="Transaction", mode="append", properties=properties)
Employee.write.jdbc(url=url, table="Employee", mode="append", properties=properties)
Fact_table.write.jdbc(url=url, table="Fact_table", mode="append", properties=properties)

print('Database, Table, and data loaded successfully')

Database, Table, and data loaded successfully
