## Importing Dependencies


In [51]:
%pip install pyspark sqlalchemy pandas pyarrow 

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.



In [52]:
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd


In [53]:
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [54]:
#Initializing spark session
spark

### Data Extraction

In [55]:
Nuga_bank_df = spark.read.csv(r'dataset\nuga_bank_transactions.csv', header= True, inferSchema=True)

In [56]:
Nuga_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [57]:
Nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

## Data Transformation

In [58]:
# To determine the number of rows
no_of_rows =Nuga_bank_df.count()

In [59]:
no_of_rows

1000000

In [60]:
#to determine the number of columns
no_of_columns = len(Nuga_bank_df.columns)

In [61]:
no_of_columns

23

In [62]:
#checking for null values
for column in Nuga_bank_df.columns:
    print(column, 'Null', Nuga_bank_df.filter(Nuga_bank_df[column].isNull()).count())

Transaction_Date Null 0
Amount Null 0
Transaction_Type Null 0
Customer_Name Null 100425
Customer_Address Null 100087
Customer_City Null 100034
Customer_State Null 100009
Customer_Country Null 100672
Company Null 100295
Job_Title Null 99924
Email Null 100043
Phone_Number Null 100524
Credit_Card_Number Null 100085
IBAN Null 100300
Currency_Code Null 99342
Random_Number Null 99913
Category Null 100332
Group Null 100209
Is_Active Null 100259
Last_Updated Null 100321
Description Null 100403
Gender Null 99767
Marital_Status Null 99904


In [63]:
# FIlling up missing values

nuga_bank = Nuga_bank_df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address':'Unknown',
    'Customer_City':'Unknown',
    'Customer_State':'Unknown',
    'Customer_Country':'Unknown',
    'Company':'Unknown',
    'Job_Title':'Unknown',
    'Email': 'Uknown',
    'Phone_Number':'Unknown',
    'Credit_Card_Number':0,
    'IBAN':'Unknown',
    'Currency_Code':'Unknown',
    'Random_Number':0.0,
    'Category':'Unknown',
    'Group':'Unknown',
    'Is_Active':'Unknown',
    'Description':'Unknown',
    'Gender':'Unknown',
    'Marital_Status':'Unknown',

})

In [64]:
for column in nuga_bank.columns:
    print(column, 'Null', nuga_bank.filter(nuga_bank[column].isNull()).count())

Transaction_Date Null 0
Amount Null 0
Transaction_Type Null 0
Customer_Name Null 0
Customer_Address Null 0
Customer_City Null 0
Customer_State Null 0
Customer_Country Null 0
Company Null 0
Job_Title Null 0
Email Null 0
Phone_Number Null 0
Credit_Card_Number Null 0
IBAN Null 0
Currency_Code Null 0
Random_Number Null 0
Category Null 0
Group Null 0
Is_Active Null 0
Last_Updated Null 100321
Description Null 0
Gender Null 0
Marital_Status Null 0


In [65]:
#dropping rows where last updated is null
nuga_bank = nuga_bank.na.drop(subset=['Last_Updated'])

In [66]:
for column in nuga_bank.columns:
    print(column, 'Nulls', nuga_bank.filter(nuga_bank[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [67]:
no_of_row = nuga_bank.count()
no_of_row

899679

In [68]:
#Summary stats of the data
nuga_bank.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+-------

In [69]:
spark

### Modelling the data/ Transformation

In [70]:
#Transactions schema
Transaction = nuga_bank.select('Transaction_Date', 'Amount', 'Transaction_Type')

In [71]:
#Adding a new column to the transaction schema
Transaction = Transaction.withColumn('Transaction_ID', monotonically_increasing_id())

In [72]:
#Re-orfering of the transaction schema
Transaction = Transaction.select('Transaction_ID','Transaction_Date', 'Amount', 'Transaction_Type').distinct()

In [73]:
Transaction.show(5)

+--------------+--------------------+------+----------------+
|Transaction_ID|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|           108|2024-04-30 03:13:...|302.59|         Deposit|
|           442|2024-04-27 07:39:...|608.58|         Deposit|
|          1019|2024-03-19 12:09:...|951.23|        Transfer|
|          1026|2024-02-24 00:22:...|627.81|         Deposit|
|          1206|2024-01-16 09:15:...|432.81|        Transfer|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [74]:
#Customer table
Customer = nuga_bank.select('Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_Country', 'Email', 'Phone_Number')

In [75]:
#Adding a customer_id column:
Customer = Customer.withColumn('Customer_ID', monotonically_increasing_id())

In [76]:
#Re-ordering the customer schema
Customer = Customer.select('Customer_ID','Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_Country', 'Email', 'Phone_Number').distinct()

In [77]:
Customer.show()

+-----------+-------------------+--------------------+----------------+--------------------+--------------------+--------------------+
|Customer_ID|      Customer_Name|    Customer_Address|   Customer_City|    Customer_Country|               Email|        Phone_Number|
+-----------+-------------------+--------------------+----------------+--------------------+--------------------+--------------------+
|       1160|         Amy Malone|   989 Moyer Estates|        Gillland|             Eritrea| scott93@example.net|001-862-947-4500x421|
|       1193|      Andrew Benson|  1631 Hawkins Ports|     Lamberttown|United Arab Emirates|duncanbarbara@exa...| +1-539-932-5174x292|
|       1235|     Cassandra Rich|    4143 Flores Camp|      Lake Nancy|        Cook Islands|qbarnett@example.net|       (845)729-0291|
|       1303|    Colin Fernandez|   40328 Bruce Ridge|     Melissaland|    Christmas Island|griffinandrea@exa...|        637.360.7815|
|       1372|Christopher Mercado|     796 Tina Divide|L

In [78]:
#Employee schema
Employee =nuga_bank.select('Company', 'Job_Title', 'Gender','Marital_Status')

In [79]:
#Adding an employeeID column
Employee = Employee.withColumn('Employee_ID', monotonically_increasing_id())

In [80]:
#RE-arranging the columns
Employee = Employee.select('Employee_ID', 'Company', 'Job_Title', 'Gender','Marital_Status')

In [81]:
Employee.show()

+-----------+--------------------+--------------------+-------+--------------+
|Employee_ID|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|  Other|      Divorced|
|          1|             Unknown|   Food technologist| Female|       Married|
|          2|       Jones-Mueller|Database administ...|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| Female|       Married|
|          5|           Smith Ltd| Seismic interpreter|  Other|       Married|
|          6|         Wade-Kelley|  Surveyor, minerals|   Male|       Unknown|
|          7|             Unknown|Medical laborator...| Female|        Single|
|          8|         Lindsey LLC|Programmer, appli...| Female|        Single|
|          9|         Carroll LLC|             Unkno

In [82]:
#Fact schema
Customer_Facts = nuga_bank.join(Customer, ['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_Country', 'Email', 'Phone_Number'], 'left')\
                            .join(Transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'left')\
                            .join(Employee, ['Company', 'Job_Title', 'Gender','Marital_Status'], 'left')\
                            .select('Transaction_ID', 'Credit_Card_Number', 'IBAN', 'Currency_Code', 'Random_Number','Category', 'Group','Is_Active', 'Last_Updated','Description')\
                            

In [83]:
Customer_Facts.show()

+--------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|Transaction_ID|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   17179869198|   180067592769732|GB92JVMY004197871...|          EGP|       7198.0|       A|    Z|      Yes|2023-10-12 22:25:...|Before story prof...|
|            18|   213112163828334|GB50TJFN039979307...|          SVC|       7382.0|       B|    Z|      Yes|2020-01-19 18:19:...|Great evening so ...|
|   25769803779|                 0|GB32LGFL895760023...|          PAB|       8898.0|       A|    Y|      Yes|2021-12-07 15:35:...|Face field coach ...|
|   17179869199|  4239162655922295|GB96MEEY268453596...|          BTN|       5605.0|    

In [84]:
#saving data to parquet
Transaction.write.mode('overwrite').parquet(r'dataset\Transaction')

Py4JJavaError: An error occurred while calling o653.parquet.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
#Saving data to csv
Transaction.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset\transformeddata\csv\transaction')

In [40]:
#convert spark dataframe to panda dataframe

transaction_df = Transaction.toPandas()
customer_df = Customer.toPandas()
employee_df = Employee.toPandas()
facts_df = Customer_Facts.toPandas()

ModuleNotFoundError: No module named 'distutils'

In [None]:
#Loading dataset into a postgresql DB
db_Params ={
    'username':'postgres',
    'password':'root',
    'host':'localhost',
    'port':'5432',
    'database': 'nuga_bank'
}

#defining the dabase connections
db_url = f"postgresql://{db_Params['username']}:{db_Params['password']}@{db_Params['host']}:{db_Params['port']}/{db_Params['database']}"

#create the database engine
engine = create_engine(db_url)

#connect to the postgresSQL server
with engine.connect()as connection:
    transaction_df.to_sql('Transaction',connection, index=False, if_exist='replace')
    customer_df.to_sql('Customer',connection, index=False, if_exist='replace')
    employee_df.to_sql('Employee',connection, index=False, if_exist='replace')
    facts_df.to_sql('Customer_Facts',connection, index=False, if_exist='replace')


In [48]:
%pip install psycopg2


Defaulting to user installation because normal site-packages is not writeable
Collecting psycopg2
  Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl.metadata (5.0 kB)
Downloading psycopg2-2.9.10-cp312-cp312-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   --------- ------------------------------ 0.3/1.2 MB ? eta -:--:--
   ------------------ --------------------- 0.5/1.2 MB 932.9 kB/s eta 0:00:01
   ------------------------------------ --- 1.0/1.2 MB 1.5 MB/s eta 0:00:01
   ------------------------------------ --- 1.0/1.2 MB 1.5 MB/s eta 0:00:01
   ---------------------------------------- 1.2/1.2 MB 1.1 MB/s eta 0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10
Note: you may need to restart the kernel to use updated packages.
