In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from sqlalchemy import create_engine

In [None]:
# Inittialize the Spark session
spark = SparkSession.builder.appName('TerraBankETL').getOrCreate()

In [None]:
spark

### Data Extraction

In [None]:
terra_bank_df = spark.read.csv(r'dataset/terra_bank_transactions.csv', header=True, inferSchema=True)

In [None]:
# terra_bank_df.printShow()

In [None]:
terra_bank_df.printSchema()

In [None]:
terra_bank_df.columns

In [None]:
# Number of rows
num_rows = terra_bank_df.count()

num_rows

In [None]:
# Number of columns
num_columns = len(terra_bank_df.columns)

num_columns

In [None]:
# Checking for null values

for column in terra_bank_df.columns:
    print(column, 'Nulls', terra_bank_df.filter(terra_bank_df[column].isNull()).count())

In [None]:
terra_bank_df_clean = terra_bank_df.fillna([
    'Customer_Name':'Unknown',
    'Customer_Address':'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Description':'Unknown',
    'Gender':'Unknown,
    'Marital_Status': 'Unknown'
])

In [None]:
terra_bank_df_clean = terra_bank_df_clean.na.drop(subset=Last_Updated)

In [None]:
for column in terra_bank_df.columns:
    print(column, 'Nulls', terra_bank_df_clean.filter(terra_bank_df_clean[column].isNull()).count())

In [None]:
num_rows = terra_bank_df_clean.count()

In [None]:
terra_bank_df_clean.describe().show()

In [None]:
# Transaction Table

transactions = terra_bank_df_clean.select('Transaction_Date', 'Amount', 'Transaction_Type')

# Adding transaction_id column
transaction = transactions.withColumn('transaction_id', monotonically_increasing_id())

# Reordering the columns
transaction = transaction.select('transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')

In [None]:
# Customer table
customers = terra_bank_df_clean.select('Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country', 'Email', 'Phone_Number').distinct()

customer = customers.withColumn('customer_id', monotonically_increasing_id())
customer = customer.select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', 'Customer_Country')


In [None]:
# Employee Table

employees = terra_bank_df_clean.select('Company', 'Job_Title', 'Gender', 'Marital_Status')

employee = employees.withColumn('employee_id', monotonically_increasing_id())
employee = employees.select('employee_id', 'Company', 'Job_Title', 'Gender', 'Marital_Status')

In [None]:
# Facts Table

fact_table = terra_bank_df_clean.join(customer, ['customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_Country', 'Email', 'Phone_Number'], 'left') \
                                .join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'left') \
                                .join(employee, ['Company', 'Job_Title', 'Gender', 'Marital_Status'], 'left') \
                                .select('transaction_id', 'customer_id', 'employee_id', 'Credit_Card_Number', 'IBAN', 'Currency_Code', 'Random_Number', 'Category', 'Group', 'Is_Active', 'Last_Updated', 'Description')

In [None]:
fact_table.show()

In [None]:
# Outputting the transferred data to parquet

transaction.write.mode('overwrite').parquet(r'dataset/transaction')
customer.write.mode('overwrite').parquet(r'dataset/customer')
employee.write.mode('overwrite').parquet(r'dataset/employee')
fact_table.write.mode('overwrite').parquet(r'dataset/fact_table')

In [None]:
# Output the transferred data to csv

transaction.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/transaction/csv/')
customer.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/customer/csv/')
employee.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/employee/csv/')
transaction.repartition(3).write.mode('overwrite').option('header', 'true').csv(r'dataset/fact_table/csv/')

In [None]:
# Convert spark df to pandas df

transaction_pd_df = transaction.toPandas()
customer_pd_df = customer.toPandas()
employee_pd_df = employee.toPandas()
fact_table_pd_df = fact_table.toPandas()

In [None]:
# Loading the dataset into a Postgresql DB

# define database connection parameters
db_params = {
    'username': 'postgres',
    'password': 'MongoDB4luv',
    'host': 'localhost',
    'database': 'Terra_Bank'
}

db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

# Create the database engine with the db url
engine = create_engine(db_url)
with engine.connect() as connection:
    transaction_pd_df.to_sql('transaction', connection, index=False, if_exists='replace')
    customer_pd_df.to_sql('customer', connection, index=False, if_exists='replace')
    employee_pd_df.to_sql('employee', connection, index=False, if_exists='replace')
    fact_table_pd_df.to_sql('fact_table', connection, index=False, if_exists='replace')

print('Database, tables and data loaded successfully')