In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
#! pip install pandas
import pandas as pd
#! pip install sqlalchemy
from sqlalchemy import create_engine

In [3]:
#initialize spark session
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [5]:
df = spark.read.csv(r'E:\10alytics Data Engineering\week 9\material\nuga_bank_transactions.csv',header=True, inferSchema=True)
df.show()

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------

In [7]:
print(f'Number of rows = {df.count()}')

Number of rows = 1000000


In [9]:
df.describe().show()

+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|           Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+-----------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------------

In [11]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

Data Cleaning

In [13]:
for i in df.columns:
    no_of_null = df.filter(df[i].isNull()).count()
    print(f'Number of nulls in {i} = {no_of_null}')

Number of nulls in Transaction_Date = 0
Number of nulls in Amount = 0
Number of nulls in Transaction_Type = 0
Number of nulls in Customer_Name = 100425
Number of nulls in Customer_Address = 100087
Number of nulls in Customer_City = 100034
Number of nulls in Customer_State = 100009
Number of nulls in Customer_Country = 100672
Number of nulls in Company = 100295
Number of nulls in Job_Title = 99924
Number of nulls in Email = 100043
Number of nulls in Phone_Number = 100524
Number of nulls in Credit_Card_Number = 100085
Number of nulls in IBAN = 100300
Number of nulls in Currency_Code = 99342
Number of nulls in Random_Number = 99913
Number of nulls in Category = 100332
Number of nulls in Group = 100209
Number of nulls in Is_Active = 100259
Number of nulls in Last_Updated = 100321
Number of nulls in Description = 100403
Number of nulls in Gender = 99767
Number of nulls in Marital_Status = 99904


In [15]:
#check for columns who has null values only
null_cols = []
for i in df.columns:
    nulls = df.filter(df[i].isNull()).count()
    if nulls > 0 :
        null_cols.append(i)
print(null_cols)        

['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Company', 'Job_Title', 'Email', 'Phone_Number', 'Credit_Card_Number', 'IBAN', 'Currency_Code', 'Random_Number', 'Category', 'Group', 'Is_Active', 'Last_Updated', 'Description', 'Gender', 'Marital_Status']


In [17]:
df.select(null_cols).printSchema()

root
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)



In [19]:
dict(df.dtypes)['Customer_Address']

'string'

In [21]:
# fill null values
for i in df.columns:
    if dict(df.dtypes)[i] == 'string':
        df = df.fillna('unknown',subset=i)
    else:
          df = df.fillna(0,subset=i)  

In [23]:
# check for null values
for i in df.columns:
    nulls = df.filter(df[i].isNull()).count()
    print(f'Null values in {i} = {nulls}')

Null values in Transaction_Date = 0
Null values in Amount = 0
Null values in Transaction_Type = 0
Null values in Customer_Name = 0
Null values in Customer_Address = 0
Null values in Customer_City = 0
Null values in Customer_State = 0
Null values in Customer_Country = 0
Null values in Company = 0
Null values in Job_Title = 0
Null values in Email = 0
Null values in Phone_Number = 0
Null values in Credit_Card_Number = 0
Null values in IBAN = 0
Null values in Currency_Code = 0
Null values in Random_Number = 0
Null values in Category = 0
Null values in Group = 0
Null values in Is_Active = 0
Null values in Last_Updated = 100321
Null values in Description = 0
Null values in Gender = 0
Null values in Marital_Status = 0


###### We notice that all columns are clean except column `Last_Udated` so we will drop rows with null values

In [25]:
df = df.na.drop(subset='Last_Updated')

In [27]:
#Check for cleaning data
for i in df.columns:
    nulls= df.filter(df[i].isNull()).count()
    print(f'Null values in {i} = {nulls}')

Null values in Transaction_Date = 0
Null values in Amount = 0
Null values in Transaction_Type = 0
Null values in Customer_Name = 0
Null values in Customer_Address = 0
Null values in Customer_City = 0
Null values in Customer_State = 0
Null values in Customer_Country = 0
Null values in Company = 0
Null values in Job_Title = 0
Null values in Email = 0
Null values in Phone_Number = 0
Null values in Credit_Card_Number = 0
Null values in IBAN = 0
Null values in Currency_Code = 0
Null values in Random_Number = 0
Null values in Category = 0
Null values in Group = 0
Null values in Is_Active = 0
Null values in Last_Updated = 0
Null values in Description = 0
Null values in Gender = 0
Null values in Marital_Status = 0


In [29]:
num_of_rows = df.count()
num_of_cols = len(df.columns)
print(f'Rows = {num_of_rows}')
print(f'columns = {num_of_cols}')

Rows = 899679
columns = 23


Data Transformation

In [31]:
df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

Transform Data

In [35]:
#create table transaction
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window 

transaction = df.select('Transaction_Date','Amount','Transaction_Type')

#create column transaction_id with unique IDs
w = Window.orderBy('Amount') #To create sequential IDs ordered by columns in DataFrame
transaction = transaction.withColumn('transaction_id', row_number().over(w) )

#reordering columns
transaction = transaction.select('transaction_id','Transaction_Date','Amount','Transaction_Type')
transaction.show(3)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             1|2024-03-03 03:50:...|  10.0|      Withdrawal|
|             2|2024-02-07 17:28:...|  10.0|      Withdrawal|
|             3|2024-02-07 01:47:...|  10.0|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 3 rows



In [37]:
# create table customer
customer = df.select('Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country','Email',
 'Phone_Number')

w = Window.orderBy('Customer_Name')

#create column customer_id with unique IDS
customer = customer.withColumn('customer_id', row_number().over(w))

#reorder columns
customer = customer.select('customer_id','Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country','Email',
 'Phone_Number')
customer.show(3)

+-----------+-------------+--------------------+-----------------+--------------+----------------+--------------------+-----------------+
|customer_id|Customer_Name|    Customer_Address|    Customer_City|Customer_State|Customer_Country|               Email|     Phone_Number|
+-----------+-------------+--------------------+-----------------+--------------+----------------+--------------------+-----------------+
|          1| Aaron Abbott|    152 Alicia Forge|     Fletcherfort|       unknown|        Colombia|jeanlane@example.org| 001-427-977-6354|
|          2| Aaron Abbott|879 Jason Extensi...|          unknown| New Hampshire| Kyrgyz Republic|baileyzachary@exa...|       9489401862|
|          3| Aaron Acosta|576 Mary Inlet Su...|North Sandramouth|      Virginia|         Iceland| bryan91@example.com|697-358-6082x4103|
+-----------+-------------+--------------------+-----------------+--------------+----------------+--------------------+-----------------+
only showing top 3 rows



In [39]:
#create table employee
employee = df.select('Company','Job_Title','Gender','Marital_Status')

#create column employee_id with unique IDs
w = Window.orderBy('Company')
employee = employee.withColumn('employee_id', row_number().over(w))

#reorder columns
employee = employee.select('employee_id','Company','Job_Title','Gender','Marital_Status')
employee.show(3)

+-----------+------------+--------------------+------+--------------+
|employee_id|     Company|           Job_Title|Gender|Marital_Status|
+-----------+------------+--------------------+------+--------------+
|          1|Abbott Group|          Orthoptist|Female|       unknown|
|          2|Abbott Group|   Therapist, sports|Female|      Divorced|
|          3|Abbott Group|Journalist, newsp...| Other|        Single|
+-----------+------------+--------------------+------+--------------+
only showing top 3 rows



In [41]:
#create fact_table
fact_table = df.join(transaction, on=['Transaction_Date','Amount','Transaction_Type'],how='left')\
               .join(customer,on=['Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country','Email','Phone_Number'],how='left')\
               .join(employee,on=['Company','Job_Title','Gender','Marital_Status'],how='left')\
               .select('transaction_id','customer_id','employee_id','Credit_Card_Number','IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')
fact_table.show()

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|        109381|     160945|      17048|      630428157006|GB86GLHT381589496...|      unknown|       5000.0|       A|      Z|       No|2023-10-14 00:47:...|Everything decade...|
|        818775|     478100|     119508|  6595994198943849|             unknown|          KPW|       5778.0|       A|unknown|       No|2022-08-17 18:44:...|             unknown|
|        487638|     295225|     173035|    38082745081301|             unknown|          DJF|          0.0|  

create fact_table dataframe pandas

In [45]:
fact_df = fact_table.toPandas()

In [47]:
fact_df

Unnamed: 0,transaction_id,customer_id,employee_id,Credit_Card_Number,IBAN,Currency_Code,Random_Number,Category,Group,Is_Active,Last_Updated,Description
0,342892,846110,6,3518317232148905,GB65HEYL93762047073654,RSD,5419.0,D,X,No,2023-11-23 03:03:55.947701,Enjoy interview crime watch many billion movie.
1,117922,527659,3,501840097544,GB38LHXK41686563199390,KZT,5100.0,unknown,X,Yes,2023-05-25 23:02:19.248626,unknown
2,306316,210891,35,4399041019463442,GB10ALQO47142963184388,ZWD,1743.0,unknown,unknown,No,2020-09-30 23:58:08.573704,Air section environment.
3,226337,796830,22,4394242486211307,GB53EVQP31770580904518,CRC,8015.0,D,Y,No,2023-12-18 18:04:16.141820,Few move last behavior run smile pattern.
4,649865,402283,26,349642907887383,GB40LQGI08457726118321,CUP,0.0,unknown,unknown,No,2021-09-08 09:32:37.149013,Follow room produce off international simple own.
...,...,...,...,...,...,...,...,...,...,...,...,...
8128414,436501,32613,879757,4962604391246634752,GB20EZKH48949558380745,BIF,0.0,A,Z,No,2021-03-31 17:25:28.796448,unknown
8128415,436501,32613,880132,4962604391246634752,GB20EZKH48949558380745,BIF,0.0,A,Z,No,2021-03-31 17:25:28.796448,unknown
8128416,436501,32613,887388,4962604391246634752,GB20EZKH48949558380745,BIF,0.0,A,Z,No,2021-03-31 17:25:28.796448,unknown
8128417,436501,32613,898369,4962604391246634752,GB20EZKH48949558380745,BIF,0.0,A,Z,No,2021-03-31 17:25:28.796448,unknown


In [49]:
transaction_df = transaction.toPandas()

In [51]:
customer_df = customer.toPandas()

In [53]:
employee_df = employee.toPandas()

save data to csv

In [60]:
fact_df.to_csv(r'E:\10alytics Data Engineering\week 9\Nuga-Bank-ETL-Case-Study\transformed data\fact_table.csv',index=False)

In [62]:
transaction_df.to_csv(r'E:\10alytics Data Engineering\week 9\Nuga-Bank-ETL-Case-Study\transformed data\transaction.csv',index=False)

In [64]:
customer_df.to_csv(r'E:\10alytics Data Engineering\week 9\Nuga-Bank-ETL-Case-Study\transformed data\customer.csv',index=False)

In [66]:
employee_df.to_csv(r'E:\10alytics Data Engineering\week 9\Nuga-Bank-ETL-Case-Study\transformed data\employee.csv',index=False)