In [None]:
# git init
# git config --global user.name "Ahmad-MAA"
# git config --global user.email "musa.ahmad4abubakar@gmail.com"
# git status
# git commit -m "first commit"    
#  installing pyspark (using pip install pyspark [in the termional])
# installing psycopg2 in terminal
# install java 8
# Posgres jdbc driver(dowlaod and keepin work space directory)
#set java home
# Initialise the SparkSesion
# Create a spark dataframe
# Clean Data
#Data Transformation to 2NF
## Customer table
# employee table
# Building the Fact_table
# Databloading
# connect to sql database
# Loading to data base



In [68]:
# Import necesary libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2


In [69]:
# Set Java home
os.environ['JAVA_HOME'] = r'C:\java8'

In [70]:
# Initialise the SparkSesion
spark = SparkSession.builder\
        .appName("Nuga Bank ETL")\
        .config("spark.jars", "postgresql-42.7.4.jar")\
        .getOrCreate()
#

In [71]:
spark

In [72]:
# Extract the historicat data into a spark dataframe
df = spark.read.csv(r'dataset\rawdata\nuga_bank_transactions.csv', header=True, inferSchema=True)


In [11]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [12]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [16]:
#Data Cleaning and Transformation
for column in df.columns:
    print (column, 'Nulls:', df.filter(df[column].isNull()).count())

Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 100425
Customer_Address Nulls: 100087
Customer_City Nulls: 100034
Customer_State Nulls: 100009
Customer_Country Nulls: 100672
Company Nulls: 100295
Job_Title Nulls: 99924
Email Nulls: 100043
Phone_Number Nulls: 100524
Credit_Card_Number Nulls: 100085
IBAN Nulls: 100300
Currency_Code Nulls: 99342
Random_Number Nulls: 99913
Category Nulls: 100332
Group Nulls: 100209
Is_Active Nulls: 100259
Last_Updated Nulls: 100321
Description Nulls: 100403
Gender Nulls: 99767
Marital_Status Nulls: 99904


In [17]:
df.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------

In [73]:
# fill up the missing values
df_clean = df.fillna({
    'Customer_Name' : 'Unknown',
    'Customer_Address' : 'Unknown',
    'Customer_City' : 'Unknown',
    'Customer_State' : 'Unknown',
    'Customer_Country' : 'Unknown',
    'Company' : 'Unknown',
    'Job_Title' : 'Unknown',
    'Email' : 'Unknown',
    'Phone_Number' : 'Unknown',
    'Credit_Card_Number' : 0,
    'IBAN' : 'Unknown',
    'Currency_Code' : 'Unknown',
    'Random_Number' :0.0,
    'Category' : 'Unknown',
    'Group' : 'Unknown',
    'Is_Active' : 'Unknown',
    'Description' : 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status' : 'Unknown'
})

In [74]:
# drop the missing Values in the Last_Updated column
df_clean = df_clean.na.drop(subset = ['Last_Updated'])

In [75]:
#Data Cleaning and Transformation
for column in df_clean.columns:
    print (column, 'Nulls:', df_clean.filter(df_clean[column].isNull()).count())

Transaction_Date Nulls: 0
Amount Nulls: 0
Transaction_Type Nulls: 0
Customer_Name Nulls: 0
Customer_Address Nulls: 0
Customer_City Nulls: 0
Customer_State Nulls: 0
Customer_Country Nulls: 0
Company Nulls: 0
Job_Title Nulls: 0
Email Nulls: 0
Phone_Number Nulls: 0
Credit_Card_Number Nulls: 0
IBAN Nulls: 0
Currency_Code Nulls: 0
Random_Number Nulls: 0
Category Nulls: 0
Group Nulls: 0
Is_Active Nulls: 0
Last_Updated Nulls: 0
Description Nulls: 0
Gender Nulls: 0
Marital_Status Nulls: 0


In [76]:
#Data Transformation to 2NF

# Transaction table
transaction = df_clean.select('Transaction_Date','Amount','Transaction_Type')\
                                .withColumn('transaction_id', monotonically_increasing_id())\
                                .select('transaction_id','Transaction_Date','Amount','Transaction_Type')


In [23]:
transaction.show(5)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



# Customer table
customer = nuga_bank_df_clean.select( 'Customer_Name','Customer_Address','Customer_City',\
                                     'Customer_State','Customer_Country','Email','Phone_Number').distinct()
# Adding the transaction_id column
customer = customer.withColumn('customer_id', monotonically_increasing_id())

#re-ordering columns
customer = customer.select('customer_id','Customer_Name','Customer_Address','Customer_City',\
                                     'Customer_State','Customer_Country','Email','Phone_Number')


In [77]:
# Customer table
customer = df_clean.select( 'Customer_Name','Customer_Address','Customer_City',\
                                     'Customer_State','Customer_Country','Email','Phone_Number').distinct()\
                    .withColumn('customer_id', monotonically_increasing_id())\
                    .select('customer_id','Customer_Name','Customer_Address','Customer_City',\
                                     'Customer_State','Customer_Country','Email','Phone_Number')


In [26]:
customer.show(5)

+-----------+--------------+--------------------+-------------+--------------+----------------+--------------------+--------------------+
|customer_id| Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|               Email|        Phone_Number|
+-----------+--------------+--------------------+-------------+--------------+----------------+--------------------+--------------------+
|          0|Miguel Leonard|262 Beck Expressw...|      Unknown| West Virginia|         Eritrea| zweaver@example.net|             Unknown|
|          1|       Unknown|             Unknown|  Evanchester|        Oregon|         Uruguay|             Unknown| (384)778-9942x91236|
|          2|Michael Murphy|894 Williams Ridg...|Dominguezview|      New York|          Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|Tina Gutierrez|    415 Taylor Knoll|    Donnastad|South Carolina|         Unknown|sarabrooks@exampl...|  623-933-0431x87174|
|          4|  Kylie Adkins|    43

In [78]:
# employee table
employee = df_clean.select('Company','Job_Title','Gender','Marital_Status').distinct()\
                    .withColumn('employee_id', monotonically_increasing_id())\
                    .select('employee_id','Company','Job_Title','Gender','Marital_Status')


In [28]:
employee.show(5)

+-----------+--------------------+--------------------+-------+--------------+
|employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|         Price Group|             Unknown|   Male|        Single|
|          1|Rhodes, King and ...| Trade mark attorney|   Male|       Unknown|
|          2|Schmidt, Morgan a...|     Engineer, water| Female|        Single|
|          3|       Johnson Group|  Forensic scientist|   Male|       Unknown|
|          4|     Phillips-Prince|Production assist...|Unknown|        Single|
+-----------+--------------------+--------------------+-------+--------------+
only showing top 5 rows



In [79]:
# Building the Fact_table
fact_table = df_clean.join(customer,['Customer_Name','Customer_Address','Customer_City',\
                                     'Customer_State','Customer_Country','Email','Phone_Number'], 'inner')\
                    .join(transaction,['Transaction_Date','Amount','Transaction_Type'],'inner')\
                    .join(employee,['Company','Job_Title','Gender','Marital_Status'],'inner')\
                    .select('transaction_id','customer_id','employee_id','Credit_Card_Number',\
                            'IBAN','Currency_Code','Random_Number','Category','Group', 'Is_Active',\
                             'Last_Updated', 'Description')

In [31]:
fact_table.show(5)

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|    8590072254| 8589986570|      62959|  3518317232148905|GB65HEYL937620470...|          RSD|       5419.0|       D|    X|       No|2023-11-23 03:03:...|Enjoy interview c...|
|        175861|17179904194|        818|      501840097544|GB38LHXK416865631...|          KZT|       5100.0| Unknown|    X|      Yes|2023-05-25 23:02:...|             Unknown|
|   17180060193|      89758|     129561|  4147377235704853|             Unknown|          FKP|       8434.0|       B|   

In [83]:
#Databloading
def get_db_connection():
    connection = psycopg2.connect(
        host ='localhost',
        database ='nuga_bank',
        user ='postgres',
        password ='password'
    )
    return connection
#connect to sql database
conn = get_db_connection()

In [90]:
def create_tables():
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        create_table_query = '''
                            DROP TABLE IF EXISTS customer;
                            DROP TABLE IF EXISTS transaction;
                            DROP TABLE IF EXISTS employee;
                            DROP TABLE IF EXISTS fact_table;

                            CREATE TABLE customer (
                                customer_id BIGINT,
                                Customer_Name VARCHAR(1000),
                                Customer_Address VARCHAR(1000),
                                Customer_City VARCHAR(1000),
                                Customer_State VARCHAR(1000),
                                Customer_Country VARCHAR(1000),
                                Email VARCHAR(1000),
                                Phone_Number VARCHAR(1000)                    
                            );

                            CREATE TABLE transaction (
                                transaction_id BIGINT,
                                Amount FLOAT,
                                Transaction_Date DATE,
                                Transaction_Type VARCHAR(1000)
                            );

                            CREATE TABLE employee(
                                employee_id BIGINT,
                                Company VARCHAR(1000),
                                Job_Title VARCHAR(1000),
                                Gender VARCHAR(1000),
                                Marital_Status VARCHAR(1000)
                            );

                            CREATE TABLE fact_table(
                                transaction_id BIGINT,
                                customer_id BIGINT,
                                employee_id BIGINT,
                                Credit_Card_Number VARCHAR(1000),
                                IBAN VARCHAR(1000),
                                Currency_Code VARCHAR(1000),
                                Random_Number FLOAT,
                                Category VARCHAR(1000),
                                "Group" VARCHAR(1000),
                                Is_Active VARCHAR(1000),
                                Last_Updated DATE,
                                Description VARCHAR(1000)
                            );
                            '''
        cursor.execute(create_table_query)
        conn.commit()
        print("Tables created successfully.")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        cursor.close()
        conn.close()


In [91]:
create_tables()

Tables created successfully.


In [96]:
url = "jdbc:postgresql://localhost:5432/nuga_bank"
properties = {
    "user" : "postgres",
    "password" : "password",
    "driver" : "org.postgresql.Driver"
}
customer.write.jdbc(url=url, table="customer", mode="append", properties=properties)

In [97]:
employee.write.jdbc(url=url, table="employee", mode="append", properties=properties)
transaction.write.jdbc(url=url, table="transaction", mode="append", properties=properties)
fact_table.write.jdbc(url=url, table="fact_table", mode="append", properties=properties)