### Import Necessary Dependencies

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine

In [2]:
# Initialize our spark session
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [3]:
spark

### Data Extraction

In [4]:
nuga_bank_df = spark.read.csv(r'dataset\nuga_bank_transactions.csv', header=True, inferSchema=True )

In [7]:
nuga_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [None]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [11]:
nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [13]:
#no of rows
num_rows = nuga_bank_df.count()
num_rows

1000000

In [14]:
#no of columns
num_columns = len(nuga_bank_df.columns)
num_columns

23

### Data Cleaning and Transformation

In [15]:
# Checking for nul values
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259
Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [27]:
# How to fill up missing values
nuga_bank_df_clean = nuga_bank_df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Description': 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown'

})

In [28]:
# Confirm if no null values
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [29]:
# drop rows where Last_Updated is Null
nuga_bank_df_clean = nuga_bank_df_clean.na.drop(subset=['Last_Updated'])

In [30]:
# Confirm o null values for Last_Updated
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [31]:
#confirm the no of rows again after row drop for Last_Updated
num_rows = nuga_bank_df_clean.count()
num_rows

899679

In [32]:
# To view the summary statistics of the data
nuga_bank_df_clean.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+-------+---------+----

### Data Transformation

In [33]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [45]:
# transaction table
transaction = nuga_bank_df_clean.select('Transaction_Date', 'Amount', 'Transaction_Type').distinct()
transaction.show()

+--------------------+------+----------------+
|    Transaction_Date|Amount|Transaction_Type|
+--------------------+------+----------------+
|2024-01-10 22:13:...|169.64|         Deposit|
|2024-01-06 12:05:...|444.53|         Deposit|
|2024-01-09 02:32:...|976.36|        Transfer|
|2024-02-18 21:04:...|521.62|         Deposit|
|2024-04-11 13:35:...|416.11|         Deposit|
|2024-03-20 11:34:...|438.03|         Deposit|
|2024-04-29 10:42:...| 28.27|        Transfer|
|2024-02-12 15:48:...|657.39|         Deposit|
|2024-01-16 03:08:...|489.04|      Withdrawal|
|2024-04-27 01:11:...| 32.36|      Withdrawal|
|2024-04-13 04:39:...| 152.8|         Deposit|
|2024-02-07 20:31:...|736.03|      Withdrawal|
|2024-03-09 11:50:...|516.88|        Transfer|
|2024-01-22 16:12:...|615.23|         Deposit|
|2024-02-18 19:51:...|119.83|        Transfer|
|2024-04-15 10:58:...|630.29|         Deposit|
|2024-02-22 06:42:...|923.79|         Deposit|
|2024-04-22 06:27:...|832.71|      Withdrawal|
|2024-02-14 1

In [46]:
from pyspark.sql.functions import monotonically_increasing_id


In [47]:
# Adding the transaction_id column
transaction = transaction.withColumn('transaction_id', monotonically_increasing_id())

In [40]:
transaction.show()

+--------------------+------+----------------+--------------+
|    Transaction_Date|Amount|Transaction_Type|transaction_id|
+--------------------+------+----------------+--------------+
|2024-03-23 15:38:...| 34.76|      Withdrawal|             0|
|2024-04-22 19:15:...|163.92|      Withdrawal|             1|
|2024-04-12 19:46:...|386.32|      Withdrawal|             2|
|2024-04-17 15:29:...|407.15|         Deposit|             3|
|2024-02-10 01:51:...|161.31|         Deposit|             4|
|2024-02-10 22:56:...|764.34|        Transfer|             5|
|2024-04-07 00:07:...|734.59|         Deposit|             6|
|2024-03-08 01:51:...|592.43|         Deposit|             7|
|2024-02-01 12:34:...| 927.1|         Deposit|             8|
|2024-03-22 16:46:...| 66.59|        Transfer|             9|
|2024-04-23 13:30:...| 246.3|      Withdrawal|            10|
|2024-01-13 01:22:...|782.32|      Withdrawal|            11|
|2024-02-25 15:16:...|818.42|      Withdrawal|            12|
|2024-01

In [48]:
# Reodering the column
transaction = transaction.select('transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type' )
transaction.show()

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-01-10 22:13:...|169.64|         Deposit|
|             1|2024-01-06 12:05:...|444.53|         Deposit|
|             2|2024-01-09 02:32:...|976.36|        Transfer|
|             3|2024-02-18 21:04:...|521.62|         Deposit|
|             4|2024-04-11 13:35:...|416.11|         Deposit|
|             5|2024-03-20 11:34:...|438.03|         Deposit|
|             6|2024-04-29 10:42:...| 28.27|        Transfer|
|             7|2024-02-12 15:48:...|657.39|         Deposit|
|             8|2024-01-16 03:08:...|489.04|      Withdrawal|
|             9|2024-04-27 01:11:...| 32.36|      Withdrawal|
|            10|2024-04-13 04:39:...| 152.8|         Deposit|
|            11|2024-02-07 20:31:...|736.03|      Withdrawal|
|            12|2024-03-09 11:50:...|516.88|        Transfer|
|       

In [49]:
# Customer Table
customer = nuga_bank_df_clean.select('Customer_Name','Customer_Address','Customer_City','Customer_State', \
                                     'Customer_Country', 'Email', 'Phone_Number').distinct()

# Add id column
customer = customer.withColumn('customer_id', monotonically_increasing_id())

# Reodering the column
customer = customer.select('customer_id', 'Customer_Name','Customer_Address','Customer_City', \
                                     'Customer_State','Customer_Country', 'Email', 'Phone_Number' )
customer.show()

+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|customer_id|     Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|        Phone_Number|
+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|          0|    Miguel Leonard|262 Beck Expressw...|             Unknown| West Virginia|             Eritrea| zweaver@example.net|             Unknown|
|          1|           Unknown|             Unknown|         Evanchester|        Oregon|             Uruguay|             Unknown| (384)778-9942x91236|
|          2|    Michael Murphy|894 Williams Ridg...|       Dominguezview|      New York|              Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|    Tina Gutierrez|    415 Taylor Knoll|           Donnastad|South Ca

In [50]:
# Employee Table
employee = nuga_bank_df_clean.select('Company','Job_Title','Gender', 'Marital_Status').distinct()

# Add id column
employee = employee.withColumn('employee_id', monotonically_increasing_id())

# Reodering the column
employee = employee.select('employee_id', 'Company','Job_Title','Gender', 'Marital_Status')
employee.show()

+-----------+--------------------+--------------------+-------+--------------+
|employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|         Price Group|             Unknown|   Male|        Single|
|          1|Rhodes, King and ...| Trade mark attorney|   Male|       Unknown|
|          2|Schmidt, Morgan a...|     Engineer, water| Female|        Single|
|          3|       Johnson Group|  Forensic scientist|   Male|       Unknown|
|          4|     Phillips-Prince|Production assist...|Unknown|        Single|
|          5|      Henry and Sons|Engineer, civil (...| Female|       Married|
|          6|Thompson, Johnson...|Exercise physiolo...|  Other|       Unknown|
|          7|Hernandez, Johnso...|Forensic psycholo...|Unknown|      Divorced|
|          8|Carrillo, Schwart...| Solicitor, Scotland| Female|        Single|
|          9|         Olson-Lucas| Magazine journali

In [57]:
# fact table
fact_table = nuga_bank_df_clean.join(customer, ['Customer_Name','Customer_Address','Customer_City', \
                                    'Customer_State','Customer_Country', 'Email', 'Phone_Number'], 'left') \
                                        .join(transaction, ['Transaction_Date', 'Amount', \
                                         'Transaction_Type'], 'left') \
                                        .join(employee, ['Company','Job_Title','Gender', \
                                           'Marital_Status'], 'left') \
                                        .select('transaction_id', 'customer_id', 'employee_id', \
                                            'Credit_Card_Number','IBAN','Currency_Code','Random_Number', \
                                            'Category','Group','Is_Active','Last_Updated','Description',)

In [58]:
fact_table.show()

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|   34359759045|42949691602|      14483|      630428157006|GB86GLHT381589496...|      Unknown|       5000.0|       A|      Z|       No|2023-10-14 00:47:...|Everything decade...|
|   60129602657|60129583452|      45443|    38082745081301|             Unknown|          DJF|          0.0|       A|      Z|      Yes|2021-05-24 05:28:...|Into because end....|
|    8589968928|60129572500|      33729|     4021800082481|GB78UBAH195883770...|          LBP|       8097.0|  

In [82]:
# output the transformed data to parquet
transaction.write.mode('overwrite').parquet('dataset/transaction')
customer.write.mode('overwrite').parquet('dataset/customer')
employee.write.mode('overwrite').parquet('dataset/employee')
fact_table.write.mode('overwrite').parquet('dataset/fact_table')

Py4JJavaError: An error occurred while calling o902.parquet.
: java.util.concurrent.ExecutionException: Boxed Exception
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolve(Promise.scala:99)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
	at scala.concurrent.Promise.complete(Promise.scala:57)
	at scala.concurrent.Promise.complete$(Promise.scala:56)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:104)
	at scala.concurrent.Promise.failure(Promise.scala:109)
	at scala.concurrent.Promise.failure$(Promise.scala:109)
	at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:104)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$2(QueryStageExec.scala:333)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:369)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolve(Promise.scala:99)
		at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
		at scala.concurrent.Promise.complete(Promise.scala:57)
		at scala.concurrent.Promise.complete$(Promise.scala:56)
		at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:104)
		at scala.concurrent.Promise.failure(Promise.scala:109)
		at scala.concurrent.Promise.failure$(Promise.scala:109)
		at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:104)
		at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$2(QueryStageExec.scala:333)
		at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
		at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
		at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
		at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		... 1 more
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:817)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1415)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1620)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:739)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:961)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:194)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:481)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [None]:
# output the transformed data to csv
transaction.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/transformeddata/csv/transaction')
customer.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/transformeddata/csv/customer')
employee.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/transformeddata/csv/employee')
fact_table.repartition(1).write.mode('overwrite').option('header','true').csv(r'dataset/transformeddata/csv/fact_table')

In [79]:
# Convert spark df to pandas df
transaction_pd_df = transaction.toPandas()
customer_pd_df = customer.toPandas()
employee_pd_df = employee.toPandas()
fact_table_pd_df = fact_table.toPandas()

Py4JJavaError: An error occurred while calling o748.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 444.0 failed 1 times, most recent failure: Lost task 1.0 in stage 444.0 (TID 943) (DESKTOP-IVUNJ9J executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:462)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)


### Load the dataSET into a Posgres DB

In [72]:
!pip install psycopg2

Collecting psycopg2
  Using cached psycopg2-2.9.10-cp312-cp312-win_amd64.whl.metadata (5.0 kB)
Using cached psycopg2-2.9.10-cp312-cp312-win_amd64.whl (1.2 MB)
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10


In [73]:
import psycopg2

In [78]:


# Define database connection parameters

db_params = {
    'username' : 'postgres',
    'password' : '123456789',
    'host' : 'localhost',
    'port' : '5432',
    'database' : 'nuga_bank',

}

# Define the database connection url with db parameters
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

#Create the database engine with the bd url
engine = create_engine(db_url)


#Connect tp postgreSQL Server
with engine.connect() as connection:
    #Create tables and load the data
    transaction_pd_df.to_sql('transaction', connection, index=False, if_exists='replace')
    customer_pd_df.to_sql('customer', connection, index=False, if_exists='replace')
    employee_pd_df.to_sql('employee', connection, index=False, if_exists='replace')
    fact_table.to_sql('fact_table', connection, index=False, if_exists='replace')

print('Database, tables and data loaded successfully')

NameError: name 'transaction_pd_df' is not defined