In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine
from pyspark.sql.functions import monotonically_increasing_id
from dotenv import load_dotenv
import os
 

In [2]:
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [3]:
spark

In [4]:
nuga_bank_df =spark.read.csv(r'dataset\nuga_bank_transactions.csv', header= True, inferSchema=True)

In [5]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [6]:
nuga_bank_df.count()

1000000

In [7]:
len(nuga_bank_df.columns)

23

In [11]:
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [12]:
nuga_bank_df_clean = nuga_bank_df.fillna({
    'Customer_Name':'Unknown',
    'Customer_Address':'Unknown',
    'Customer_City':'Unknown',
    'Customer_State':'Unknown',
    'Customer_Country':'Unknown',
    'Company':'Unknown',
    'Job_Title':'Unknown',
    'Email':'Unknown',
    'Phone_Number':'Unknown',
    'Credit_Card_Number': 0,
    'IBAN':'Unknown',
    'Currency_Code':'Unknown',
    'Random_Number':0.0,
    'Category':'Unknown',
    'Group':'Unknown',
    'Is_Active':'Unknown',
     'Description':'Unknown',
     'Gender':'Unkown',
     'Marital_Status':'Unknown'
})

In [14]:
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())
    

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [16]:
nuga_bank_df_clean = nuga_bank_df_clean.na.drop(subset=['Last_Updated'])

In [17]:
nuga_bank_df_clean.count()

899679

In [18]:
nuga_bank_df_clean.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+--------------------+------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category|  Group|Is_Active|         Description|Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+------

### Transformation

In [19]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

### Creating Data Models

In [34]:
transaction = nuga_bank_df_clean.select('Transaction_Date','Amount','Transaction_Type').distinct()
transaction = transaction.withColumn('transaction_id', monotonically_increasing_id())

In [35]:
transaction =transaction.select('transaction_id','Transaction_Date','Amount','Transaction_Type')

In [28]:
transaction.show(5)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [33]:
customer = nuga_bank_df_clean.select('Customer_Name','Customer_Address', 'Customer_City','Customer_State','Customer_Country','Email','Phone_Number').distinct()
customer =customer.withColumn('customer_id', monotonically_increasing_id())
customer=customer.select('customer_id','Customer_Name','Customer_Address', 'Customer_City','Customer_State','Customer_Country','Email','Phone_Number')

In [30]:
customer.show(5)

+-----------+--------------+--------------------+------------------+--------------+--------------------+--------------------+-------------------+
|customer_id| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|               Email|       Phone_Number|
+-----------+--------------+--------------------+------------------+--------------+--------------------+--------------------+-------------------+
|          0|    James Neal|54912 Holmes Lodg...| West Keithborough|       Florida|                Togo|             Unknown|  493.720.6609x7545|
|          1|   Thomas Long| 1133 Collin Passage|        Joshuabury|   Connecticut|Lao People's Demo...|michellelynch@exa...|      (497)554-3317|
|          2|Ashley Shelton|5297 Johnson Port...|       North Maria|    New Jersey|              Bhutan| ljordan@example.org|      (534)769-3072|
|          3| James Rosario|56955 Moore Glens...|North Michellefurt|    New Mexico|             Iceland|parkerjames@examp...

In [32]:
employee = nuga_bank_df_clean.select('Company','Job_Title','Gender','Marital_Status').distinct()

employee =employee.withColumn('employee_id', monotonically_increasing_id())
employee=employee.select('employee_id','Company','Job_Title','Gender','Marital_Status')
employee.show(5)

+-----------+--------------------+--------------------+------+--------------+
|employee_id|             Company|           Job_Title|Gender|Marital_Status|
+-----------+--------------------+--------------------+------+--------------+
|          0|         Price Group|             Unknown|  Male|        Single|
|          1|Rhodes, King and ...| Trade mark attorney|  Male|       Unknown|
|          2|Schmidt, Morgan a...|     Engineer, water|Female|        Single|
|          3|       Johnson Group|  Forensic scientist|  Male|       Unknown|
|          4|      Henry and Sons|Engineer, civil (...|Female|       Married|
+-----------+--------------------+--------------------+------+--------------+
only showing top 5 rows



In [36]:
fact_table = nuga_bank_df_clean.join(customer,['Customer_Name','Customer_Address', 'Customer_City','Customer_State','Customer_Country','Email','Phone_Number'], 'left')\
                                .join(transaction, ['Transaction_Date','Amount','Transaction_Type'],'left')\
                                    .join(employee,['Company','Job_Title','Gender','Marital_Status'])\
                                        .select('transaction_id','customer_id','employee_id','Credit_Card_Number','IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')

In [41]:
fact_table.show()

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|    8589976472| 8589987406|      44772|  3518317232148905|GB65HEYL937620470...|          RSD|       5419.0|       D|      X|       No|2023-11-23 03:03:...|Enjoy interview c...|
|   51539626282|25769830373|      13811|      501840097544|GB38LHXK416865631...|          KZT|       5100.0| Unknown|      X|      Yes|2023-05-25 23:02:...|             Unknown|
|   17179953513| 8590030383|      83082|  4399041019463442|GB10ALQO471429631...|          ZWD|       1743.0| U

In [None]:
# save to parquet
transaction.write.mode('overwrite').parquet(r'dataset/transaction')
customer.write.mode('overwrite').parquet(r'dataset/customer')
employee.write.mode('overwrite').parquet(r'dataset/employee')
fact_table.write.mode('overwrite').parquet(r'dataset/tfact_table')

In [None]:
transaction.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/transaction')
customer.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/customer')
employee.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/employee')
fact_table.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/fact_table')


### Loading Data in Postgres

In [None]:
# First converting tables in pandas df
transaction_pd_df = transaction.toPandas()
customer_pd_df = customer.toPandas()
employee_pd_df = employee.toPandas()
fact_table_pd_df = fact_table.toPandas()

In [8]:
transaction_pd_df = pd.read_csv('dataset/transaction.csv')
customer_pd_df = pd.read_csv('dataset/customer.csv')
employee_pd_df = pd.read_csv('dataset/employee.csv')
fact_table_pd_df = pd.read_csv('dataset/fact_table.csv')

In [5]:
transaction_pd_df.head()

Unnamed: 0,customer_id,Customer_Name,Customer_Address,Customer_City,Customer_State,Customer_Country,Email,Phone_Number
0,0,Miguel Leonard,262 Beck Expressway Suite 504,Unknown,West Virginia,Eritrea,zweaver@example.net,Unknown
1,1,Unknown,Unknown,Evanchester,Oregon,Uruguay,Unknown,(384)778-9942x91236
2,2,Michael Murphy,894 Williams Ridges Apt. 635,Dominguezview,New York,Sweden,kristinstanley@example.com,+1-693-739-2204x8851
3,3,Tina Gutierrez,415 Taylor Knoll,Donnastad,South Carolina,Unknown,sarabrooks@example.com,623-933-0431x87174
4,4,Kylie Adkins,435 Nicole Curve,Unknown,Louisiana,Unknown,davisronald@example.net,(404)814-4457x1451


In [None]:
load_dotenv(override=True)

In [9]:
db_params ={
    'username' : os.getenv('username'),
    'password': os.getenv('password'),
    'host':os.getenv('host'),
    'port':os.getenv('port'),
    'database': os.getenv('database')
}

db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

engine = create_engine(db_url)

with engine.connect() as connection:
    transaction_pd_df.to_sql('transaction', connection, index=False, if_exists='replace')
    transaction_pd_df.to_sql('customer', connection, index=False, if_exists='replace')
    transaction_pd_df.to_sql('employee', connection, index=False, if_exists='replace')
    transaction_pd_df.to_sql('fact_table', connection, index=False, if_exists='replace')
    
print('Datasets successfully loaded')

Datasets successfully loaded
