### Import necessary dependencies

In [24]:
#!pip install setuptools

#!pip install --upgrade pandas


In [25]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id 
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv

In [26]:
## set jave home to avoid java running with the previous version
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_202'

In [27]:
#ENABLING FIREWALLS BLOCKING
#import os
os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1"

In [None]:
# initialize my spark seesion with allowed security
from pyspark.sql import SparkSession # type: ignore

spark = SparkSession.builder \
    .appName("GWIHR_PROJECT") \
    .config("spark.jars", r"postgresql-42.7.4.jar") \
    .getOrCreate()
spark

### Data Extraction

In [29]:
Century_bank_df = spark.read.csv(r'Raw_data\Century_bank_transactions.csv',header = True)
Century_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [None]:
## To read your file and also show the correct datatype,
Century_bank_df = spark.read.csv(r'Raw_data\Century_bank_transactions.csv',header = True, inferSchema=True)

Century_bank_df.printSchema()

### Data Cleaning and Transformation

In [None]:
# check your columns
Century_bank_df.columns

In [32]:
# check number of rows
num_rows = Century_bank_df.count()

num_rows

1000000

In [33]:
# check number of columns
num_columns = len(Century_bank_df.columns)
num_columns

23

In [None]:
# checking for null values
for columns in Century_bank_df.columns:
        print(columns, 'nulls', Century_bank_df.filter(Century_bank_df[columns].isNull()).count())

In [35]:
# copy dataframe to avoid altering the original dataset
Century_bank_clean = Century_bank_df

In [36]:
# fill up missing or null values with necessary default values
Century_bank_clean = Century_bank_df.fillna({
    'Customer_Name': 'unknown',
    'Customer_Address': 'unknown',
    'Customer_City': 'unknown',
    'Customer_State':'unknown',
    'Customer_Country': 'unknown',
    'Company': 'unknown',
    'Job_Title': 'unknown',
    'Email': 'unknown',
    'Phone_Number': 'unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'unknown',
    'Currency_Code': 'unknown',
    'Random_Number': 0.0,
    'Category': 'unknown',
    'Group': 'unknown',
    'Is_Active': 'unknown',
    'Description': 'unknown',
    'Gender': 'unknown',
    'Marital_Status': 'unknown'
})

In [37]:
# Drop rows where last updated is null
Century_bank_clean = Century_bank_clean.na.drop(subset=['Last_Updated'])

## Why is this important?
# --- For Data Quality:
#--- Last_Updated is likely a timestamp or date column showing when a record was last modified or updated.

#--- If that info is missing, the record might be incomplete, outdated, or unreliable, and could distort your analysis.




In [38]:
# confirm changes made for drop command.
num_rows = Century_bank_clean.count()

num_rows

899679

In [39]:
# confirm changes made for null values
for columns in Century_bank_clean.columns:
        print(columns, 'nulls', Century_bank_clean.filter(Century_bank_clean[columns].isNull()).count())

Transaction_Date nulls 0
Amount nulls 0
Transaction_Type nulls 0
Customer_Name nulls 0
Customer_Address nulls 0
Customer_City nulls 0
Customer_State nulls 0
Customer_Country nulls 0
Company nulls 0
Job_Title nulls 0
Email nulls 0
Phone_Number nulls 0
Credit_Card_Number nulls 0
IBAN nulls 0
Currency_Code nulls 0
Random_Number nulls 0
Category nulls 0
Group nulls 0
Is_Active nulls 0
Last_Updated nulls 0
Description nulls 0
Gender nulls 0
Marital_Status nulls 0


In [40]:
# To have an overview of summary statistics of the data
Century_bank_clean.show(10)

+--------------------+------+----------------+--------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+-------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description| Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------

In [41]:
# create a data model using the appropriate tool ( lucid or draw io)


In [42]:
#Century_bank_clean.columns

### Table creation

In [43]:
# Transactaction table
Transaction = Century_bank_clean.select('Transaction_Date', 'Amount', 'Transaction_Type')
# Add the transaction_ID column
Transaction = Transaction.withColumn('Transaction_ID', monotonically_increasing_id())
# Reordering columns to make sure the added column comes first
Transaction = Transaction.select('Transaction_ID','Transaction_Date', 'Amount', 'Transaction_Type')

#Transaction.show(5)

In [44]:
# Customer table
# to reduce reductancy or repeatation where one customer made multiple purchase use (.distinct)
Customer = Century_bank_clean.select('Customer_Name','Customer_Address','Customer_City',
                                        'Customer_State','Customer_Country','Email','Phone_Number').distinct()
# Add the Customer_ID column
Customer = Customer.withColumn('Customer_ID', monotonically_increasing_id())
# Reordering columns to make sure the added column comes first
Customer = Customer.select('Customer_ID','Customer_Name','Customer_Address','Customer_City',
                                        'Customer_State','Customer_Country','Email','Phone_Number')

#Customer.show()

In [45]:
# Employee table
Employee = Century_bank_clean.select('Company','Job_Title','Gender','Marital_Status').distinct()
# Add the Customer_ID column
Employee = Employee.withColumn('Employee_ID', monotonically_increasing_id())
# Reordering columns to make sure the added column comes first
Employee = Employee.select('Employee_ID','Company','Job_Title','Gender','Marital_Status')

#Employee.show(5)

In [46]:
# Fact table
Fact_table = Century_bank_clean.join(Customer, ['Customer_Name','Customer_Address','Customer_City',\
                                        'Customer_State','Customer_Country','Email','Phone_Number'], 'left')\
                         .join(Transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'],'left')\
                         .join(Employee, ['Company','Job_Title','Gender','Marital_Status'], 'left')\
                         .select('Transaction_ID','Customer_ID','Employee_ID','Credit_Card_Number','IBAN',\
                                 'Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description',)   


Fact_table.show(5)


+--------------+------------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|Transaction_ID| Customer_ID|Employee_ID| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+------------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|  171798691842|163208795274|      21298|4119128817900391838|GB57TGOR097660836...|          AUD|       7544.0|       C|      X|       No|2022-04-27 00:48:...|Team continue own...|
|   42949672961|146028899026|       9891|     30071160638727|GB67QXVS590768285...|          JEP|       5757.0|       D|unknown|       No|2022-08-04 20:45:...|             unknown|
|  137438953473|103079245940|      28228|   4905811524319082|GB03EEKE685121511...|          KRW|    

In [47]:
# output or save transformed data as csv file
#Transaction.repartition(1).write.mode('overwrite').option('header', 'True').csv(r'C:/Users/back2/Desktop/DE-SORTED FILEZ/Century_bank project/transformed/Transaction')
#Employee.repartition(1).write.mode('overwrite').option('header', 'True').csv(r'C:\Users\back2\Desktop\(PYSPARK case_study)/Employee')
#Customer.repartition(1).write.mode('overwrite').option('header', 'True').csv(r'C:\Users\back2\Desktop\(PYSPARK case_study)/Customer')
#Fact_table.repartition(1).write.mode('overwrite').option('header', 'True').csv(r'C:\Users\back2\Desktop\(PYSPARK case_study)/Fact_table')







In [48]:
import os
import psycopg2
from dotenv import load_dotenv

# Load environment variables from a .env file
load_dotenv()

# Function to create and return a DB connection
def get_db_connection():
        connection = psycopg2.connect(
            host=os.getenv("DB_HOST"),
            database=os.getenv("DB_NAME"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD")
        )
        return connection

conn = get_db_connection()

# Optional: Confirm connection status
if conn:
    print("Database connection established.")
else:
    print("Failed to establish database connection.")

Database connection established.


### Create Database Dimension Tables

In [None]:
def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''

    -- Create schema if not exists
        CREATE SCHEMA IF NOT EXISTS loan_project;

        -- Drop existing tables (in reverse dependency order)
DROP TABLE IF EXISTS loan_project.Transaction_Table;
DROP TABLE IF EXISTS loan_project.Customer_Table;
DROP TABLE IF EXISTS loan_project.Employee_Table;
DROP TABLE IF EXISTS loan_project.Fact_table_Table;

CREATE TABLE loan_project.Transaction_Table (
    Transaction_ID BIGINT PRIMARY KEY,
    Transaction_Date DATE,
    Amount NUMERIC(12, 2),
    Transaction_Type VARCHAR(50)
);

CREATE TABLE loan_project.Customer_Table (
    Customer_ID BIGINT PRIMARY KEY,
    Customer_Name VARCHAR(100),
    Customer_Address TEXT,
    Customer_City VARCHAR(100),
    Customer_State VARCHAR(100),
    Customer_Country VARCHAR(100),
    Email VARCHAR(255),
    Phone_Number VARCHAR(50)
);

CREATE TABLE loan_project.Employee_Table (
    Employee_ID BIGINT PRIMARY KEY,
    Company VARCHAR(150),
    Job_Title VARCHAR(150),
    Gender VARCHAR(10),
    Marital_Status VARCHAR(20)
);

CREATE TABLE loan_project.Fact_table (
    Transaction_ID BIGINT,
    Customer_ID BIGINT,
    Employee_ID BIGINT,
    Credit_Card_Number VARCHAR(50),
    IBAN VARCHAR(50),
    Currency_Code VARCHAR(50),
    Random_Number INTEGER,
    Category VARCHAR(100),
    "Group" VARCHAR(100),
    Is_Active BOOLEAN,
    Last_Updated TIMESTAMP,
    Description TEXT,
    PRIMARY KEY (Transaction_ID),
    FOREIGN KEY (Transaction_ID) REFERENCES loan_project.Transaction_Table(Transaction_ID),
    FOREIGN KEY (Customer_ID) REFERENCES loan_project.Customer_Table(Customer_ID),
    FOREIGN KEY (Employee_ID) REFERENCES loan_project.Employee_Table(Employee_ID)
);
  '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()

# Call the function to create the tables
create_table()


### Load Data To Postgres Database

In [51]:
# Define database parameters including the database name
from pyspark.sql import SparkSession
#Retrieve credentials from environment variables
from dotenv import load_dotenv
import os

# Load from .env file
load_dotenv()

url = "jdbc:postgresql://localhost:5432/CENTURY_BANK_DB"
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")

properties = {
    "user": user,
    "password": password,
    "driver": "org.postgresql.Driver"
}
spark = SparkSession.builder \
    .appName("centuryBank_Project") \
    .config("spark.jars", r"postgresql-42.7.4.jar") \
    .getOrCreate()

# create table and load the data
Transaction.write.jdbc(url=url, table="loan_project.Transaction_Table", mode="append", properties=properties)
Customer.write.jdbc(url=url, table="loan_project.Customer_Table", mode="append", properties=properties)
Employee.write.jdbc(url=url, table="loan_project.Employee_Table", mode="append", properties=properties)
Fact_table.write.jdbc(url=url, table="loan_project.Fact_table", mode="append", properties=properties)
print('successfull')

successfull
