#### Installing neccessary libraries

In [1]:
# ! pip install pyspark==3.4.3
# ! pip install pyscopg2

### Import neccessary libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import psycopg2
import os

In [3]:
# Check Java version to know if the JDBC is compatible
! java -version

java version "11.0.24" 2024-07-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.24+7-LTS-271)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.24+7-LTS-271, mixed mode)


#### Set java home 

In [4]:
# Set java home 
os.environ['JAVA_HOME'] = 'C:/java8'


### Initialize the spark session

In [5]:
# Method 1
#spark = SparkSession.builder \
 #   .appName("Nuga Bank ETL") \
#    .config("spark.jars", "postgresql-42.7.4.jar") \
 #   .getOrCreate()


In [6]:
from pyspark.sql import SparkSession

# Create a SparkSession builder and set the application name to "Nuga Bank ETL" for identification in the Spark UI.
builder = SparkSession.builder.appName("Nuga Bank ETL")

# Configure Spark to include the PostgreSQL JDBC driver (postgresql-42.7.4.jar), allowing it to connect to a PostgreSQL database.
builder.config('spark.jars', 'postgresql-42.7.4.jar')

# Configure driver and executor memory settings to help prevent OutOfMemoryError.
builder.config("spark.driver.memory", "4g")         # Set driver memory to 4 GB.
builder.config("spark.executor.memory", "4g")       # Set executor memory to 4 GB.
builder.config("spark.executor.memoryOverhead", "1g")  # Set additional executor memory overhead.

# Initialize the SparkSession (or retrieve an existing one if it already exists).
spark = builder.getOrCreate()


In [7]:
# Create a SparkSession builder and set the application name to "Nuga Bank ETL" for identification in the Spark UI.
#builder = SparkSession.builder.appName("Nuga Bank ETL")

# Configure Spark to include the PostgreSQL JDBC driver (postgresql-42.7.4.jar), allowing it to connect to a PostgreSQL database.
#builder.config('spark.jars', 'postgresql-42.7.4.jar')


# Initialize the SparkSession (or retrieve an existing one if it already exists).
#spark = builder.getOrCreate()


In [8]:
spark

In [9]:
df = spark.read.csv(r'dataset\raw_data\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [10]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [11]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

### Data cleaning and transformation 

In [12]:
# Data cleaning and transformation 

for column in df.columns:
    print(column , 'column nulls values count: ',df.filter(df[column].isNull()).count())

Transaction_Date column nulls values count:  0
Amount column nulls values count:  0
Transaction_Type column nulls values count:  0
Customer_Name column nulls values count:  100425
Customer_Address column nulls values count:  100087
Customer_City column nulls values count:  100034
Customer_State column nulls values count:  100009
Customer_Country column nulls values count:  100672
Company column nulls values count:  100295
Job_Title column nulls values count:  99924
Email column nulls values count:  100043
Phone_Number column nulls values count:  100524
Credit_Card_Number column nulls values count:  100085
IBAN column nulls values count:  100300
Currency_Code column nulls values count:  99342
Random_Number column nulls values count:  99913
Category column nulls values count:  100332
Group column nulls values count:  100209
Is_Active column nulls values count:  100259
Last_Updated column nulls values count:  100321
Description column nulls values count:  100403
Gender column nulls values

In [13]:
df.describe().show()

+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+------+---------+--------------------+------+--------------+
|summary|           Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+------+---------+------------------

In [14]:
#fill up the missing values

df_clean = df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address':'Unknown',
    'Customer_City':'Unknown',
    'Customer_State':'Unknown',
    'Customer_Country':'Unknown',
    'Company':'Unknown',
    'Job_Title':'Unknown',
    'Phone_Number':'Unknown',
    'Credit_Card_Number':0,
    'IBAN':'Unknown',
    'Currency_Code':'Unknown',
    'Email':'Unknown',
    'Random_Number':0.0,
    'Category':'Unknown',
    'Group':'Unknown',
    'Is_Active':'Unknown',
    'Marital_Status':'Unknown',
    'Description':'Unknown',
    'Gender':'Unknown',
    })

In [15]:
for column in df_clean.columns:
    print(column , 'column nulls values count: ',df_clean.filter(df_clean[column].isNull()).count())

Transaction_Date column nulls values count:  0
Amount column nulls values count:  0
Transaction_Type column nulls values count:  0
Customer_Name column nulls values count:  0
Customer_Address column nulls values count:  0
Customer_City column nulls values count:  0
Customer_State column nulls values count:  0
Customer_Country column nulls values count:  0
Company column nulls values count:  0
Job_Title column nulls values count:  0
Email column nulls values count:  0
Phone_Number column nulls values count:  0
Credit_Card_Number column nulls values count:  0
IBAN column nulls values count:  0
Currency_Code column nulls values count:  0
Random_Number column nulls values count:  0
Category column nulls values count:  0
Group column nulls values count:  0
Is_Active column nulls values count:  0
Last_Updated column nulls values count:  100321
Description column nulls values count:  0
Gender column nulls values count:  0
Marital_Status column nulls values count:  0


In [16]:
df_clean = df.na.drop(subset='Last_Updated')

In [17]:
df_clean.show(10)

+--------------------+------+----------------+--------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------------

### Data Transformation

#### Transcation Table

In [18]:
# Data Transformation to 2NF
# Transcation Table

transaction = df_clean.select('Transaction_Date','Amount','Transaction_Type',) \
      .withColumn('transaction_id',monotonically_increasing_id())\
            .select('transaction_id','Transaction_Date','Amount','Transaction_Type')


In [19]:
transaction.show(3)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
+--------------+--------------------+------+----------------+
only showing top 3 rows



#### Customer Table

In [20]:

customer = df_clean.select('Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country')\
      .withColumn('Customer_id',monotonically_increasing_id())\
            .select('Customer_id','Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country')

#### Employee Table

In [21]:
employee = df_clean.select('Company','Job_Title','Email','Phone_Number','Gender','Marital_Status')\
      .withColumn('Employee_ID',monotonically_increasing_id())\
            .select('Employee_ID','Company','Job_Title','Email','Phone_Number','Gender','Marital_Status')

In [22]:
employee.show(3)

+-----------+--------------------+--------------------+--------------------+-----------------+------+--------------+
|Employee_ID|             Company|           Job_Title|               Email|     Phone_Number|Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-----------------+------+--------------+
|          0|Benson, Johnson a...|                null|                null|493.720.6609x7545| Other|      Divorced|
|          1|                null|   Food technologist|michellelynch@exa...|    (497)554-3317|Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|    (534)769-3072| Other|          null|
+-----------+--------------------+--------------------+--------------------+-----------------+------+--------------+
only showing top 3 rows



In [23]:
nuga_fact_table = df_clean.join(transaction,['Transaction_Date','Amount','Transaction_Type'], 'inner')\
      .join(customer,['Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country'],'inner')\
            .join(employee,['Company','Job_Title','Email','Phone_Number','Gender','Marital_Status'], 'inner')\
                  .select('transaction_id','Customer_id','Employee_ID','Credit_Card_Number','IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Description')

In [24]:
nuga_fact_table.show(3)

+--------------+------------+------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+
|transaction_id| Customer_id| Employee_ID| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|         Description|
+--------------+------------+------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+
|   85899354390| 85899354390| 85899354390|     30565776801553|GB48OGJL208986217...|          LAK|       7531.0|       C|    X|       No|Drop safe reality...|
|  146028907673|146028907673|146028907673|4011861817549921250|GB67MVIJ278612925...|          AED|       7376.0|       B|    X|       No| Boy life what east.|
|   25769821026| 25769821026| 25769821026|   3502762719193767|GB87UKWM952997799...|          GHS|       3365.0|       A|    Z|      Yes|                null|
+--------------+------------+------------+----------