In [1]:
# import the necessary Libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2

In [2]:
# set java home
os.environ['JAVA_HOME'] = 'C:\java8'

In [3]:
# INITIALISE MY SPARK SESSION
spark = SparkSession.builder \
        .appName("Nuga Bank ETL") \
        .master("local[*]") \
        .config(
            "spark.jars", 
            r"C:\Users\USER\Desktop\Data_Engineering\pyspark\Latest_NugaBank_ETL_Case_Study\postgresql-42.7.8.jar") \
        .getOrCreate()
        
print(spark.version)

3.5.0


In [4]:
spark

In [5]:
# Extract this historical data into spark dataframe
df = spark.read.csv(r'C:\Users\USER\Desktop\Data_Engineering\pyspark\Latest_NugaBank_ETL_Case_Study\dataset\raw\ziko_logistics_data.csv', header=True, inferSchema=True)

In [6]:
df.show(5)

+--------------+--------------------+-----------+----------+--------+------------------+-----------------+-------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+------+-------+-------------+---------------+--------------------+--------------------+------------------+
|Transaction_ID|                Date|Customer_ID|Product_ID|Quantity|        Unit_Price|       Total_Cost|Discount_Rate|Sales_Channel|Order_Priority|Warehouse_Code|Ship_Mode|Delivery_Status|Customer_Satisfaction|Item_Returned|Return_Reason| Payment_Type|Taxable|Region|Country|Customer_Name| Customer_Phone|      Customer_Email|    Customer_Address|Product_List_Title|
+--------------+--------------------+-----------+----------+--------+------------------+-----------------+-------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+

In [7]:
df.printSchema()

root
 |-- Transaction_ID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Product_ID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Discount_Rate: double (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Warehouse_Code: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Delivery_Status: string (nullable = true)
 |-- Customer_Satisfaction: string (nullable = true)
 |-- Item_Returned: boolean (nullable = true)
 |-- Return_Reason: string (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Taxable: boolean (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Phone: string (nullable = true)
 |-- Customer_Email: string (nu

### Data Cleaning and Transformation Phase

In [8]:
# Data cleaning (Checking for missing values)
for column in df.columns:
    print(column, 'Nulls: ', df.filter(df[column].isNull()).count())

Transaction_ID Nulls:  0
Date Nulls:  0
Customer_ID Nulls:  0
Product_ID Nulls:  0
Quantity Nulls:  0
Unit_Price Nulls:  101
Total_Cost Nulls:  100
Discount_Rate Nulls:  291
Sales_Channel Nulls:  0
Order_Priority Nulls:  0
Warehouse_Code Nulls:  0
Ship_Mode Nulls:  0
Delivery_Status Nulls:  0
Customer_Satisfaction Nulls:  0
Item_Returned Nulls:  0
Return_Reason Nulls:  101
Payment_Type Nulls:  0
Taxable Nulls:  0
Region Nulls:  0
Country Nulls:  0
Customer_Name Nulls:  0
Customer_Phone Nulls:  0
Customer_Email Nulls:  0
Customer_Address Nulls:  0
Product_List_Title Nulls:  0


In [9]:
# fill up the missing values
df_clean = df.fillna({
    'Unit_Price' : 0.0,
    'Total_Cost' : 0.0,
    'Discount_Rate' : 0.0,
    'Return_Reason' : 'Unknown'
})

In [10]:
# Drop the missing value in the Last_Updated column, but it didn't exist so it will show error
# df_clean = df_clean.na.drop(subset=['Last_Updated'])

In [11]:
# Data cleaning (Checking for missing values)
for column in df_clean.columns:
    print(column, 'Nulls: ', df_clean.filter(df_clean[column].isNull()).count())

Transaction_ID Nulls:  0
Date Nulls:  0
Customer_ID Nulls:  0
Product_ID Nulls:  0
Quantity Nulls:  0
Unit_Price Nulls:  0
Total_Cost Nulls:  0
Discount_Rate Nulls:  0
Sales_Channel Nulls:  0
Order_Priority Nulls:  0
Warehouse_Code Nulls:  0
Ship_Mode Nulls:  0
Delivery_Status Nulls:  0
Customer_Satisfaction Nulls:  0
Item_Returned Nulls:  0
Return_Reason Nulls:  0
Payment_Type Nulls:  0
Taxable Nulls:  0
Region Nulls:  0
Country Nulls:  0
Customer_Name Nulls:  0
Customer_Phone Nulls:  0
Customer_Email Nulls:  0
Customer_Address Nulls:  0
Product_List_Title Nulls:  0


In [12]:
df.columns


['Transaction_ID',
 'Date',
 'Customer_ID',
 'Product_ID',
 'Quantity',
 'Unit_Price',
 'Total_Cost',
 'Discount_Rate',
 'Sales_Channel',
 'Order_Priority',
 'Warehouse_Code',
 'Ship_Mode',
 'Delivery_Status',
 'Customer_Satisfaction',
 'Item_Returned',
 'Return_Reason',
 'Payment_Type',
 'Taxable',
 'Region',
 'Country',
 'Customer_Name',
 'Customer_Phone',
 'Customer_Email',
 'Customer_Address',
 'Product_List_Title']

### Data Transformation

In [194]:
# Customer Table
customer = df_clean.select('Region', 'Country', 'Customer_Name', 'Customer_Phone', 'Customer_Email', 'Customer_Address') \
                    .withColumn('Customer_ID', monotonically_increasing_id()) \
                    .select('Customer_ID', 'Region', 'Country', 'Customer_Name', 'Customer_Phone', 'Customer_Email', 'Customer_Address')

In [14]:
customer.show(5)

+-----------+------+-------+-------------+---------------+--------------------+--------------------+
|Customer_ID|Region|Country|Customer_Name| Customer_Phone|      Customer_Email|    Customer_Address|
+-----------+------+-------+-------------+---------------+--------------------+--------------------+
|          0|  West| Canada| Customer 200|+1-652-572-9306|customer.200.78@e...|275 Second St, Ph...|
|          1| South| Mexico| Customer 321|+1-311-186-5760|customer.321.13@s...|478 Third St, New...|
|          2|  West| Canada| Customer 989|+1-922-606-9032|customer.989.99@e...|843 Second St, Ph...|
|          3| South| Canada| Customer 682|+1-237-853-5808|customer.682.66@d...|153 Main St, Phoe...|
|          4| South| Mexico| Customer 484|+1-986-360-9109|customer.484.3@sa...|264 Second St, Ne...|
+-----------+------+-------+-------------+---------------+--------------------+--------------------+
only showing top 5 rows



In [208]:
# Shipping Table
shipping = df_clean.select('Warehouse_Code', 'Ship_Mode', 'Delivery_Status', 'Customer_Satisfaction', 'Item_Returned', 'Return_Reason') \
                    .withColumn('Shipping_ID', monotonically_increasing_id()) \
                    .select('Shipping_ID', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status', 'Customer_Satisfaction', 'Item_Returned', 'Return_Reason')

In [209]:
shipping.show(5)

+-----------+--------------+---------+---------------+---------------------+-------------+-------------+
|Shipping_ID|Warehouse_Code|Ship_Mode|Delivery_Status|Customer_Satisfaction|Item_Returned|Return_Reason|
+-----------+--------------+---------+---------------+---------------------+-------------+-------------+
|          0|          WH-3|    2-Day|      Cancelled|              Neutral|        false|   Wrong Item|
|          1|          WH-1|Overnight|      Backorder|            Satisfied|         true|      Damaged|
|          2|          WH-1|Overnight|        Pending|          Unsatisfied|         true|      Damaged|
|          3|          WH-1|  Express|        Pending|          Unsatisfied|        false|   Wrong Item|
|          4|          WH-2|    2-Day|      Delivered|            Satisfied|         true|         Late|
+-----------+--------------+---------+---------------+---------------------+-------------+-------------+
only showing top 5 rows



In [197]:
# Product Table
product = df_clean.select('Product_List_Title') \
                    .withColumn('Product_ID', monotonically_increasing_id()) \
                    .select('Product_ID', 'Product_List_Title')

In [198]:
product.show(5)

+----------+------------------+
|Product_ID|Product_List_Title|
+----------+------------------+
|         0|        Product 53|
|         1|        Product 33|
|         2|         Product 6|
|         3|        Product 68|
|         4|        Product 89|
+----------+------------------+
only showing top 5 rows



In [199]:
transaction = df_clean.select('Date', 'Quantity', 'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel') \
                        .withColumn('Transaction_ID', monotonically_increasing_id()) \
                        .select('Transaction_ID', 'Date','Quantity', 'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel')

In [200]:
transaction.show(5)

+--------------+--------------------+--------+------------------+-----------------+-------------+-------------+
|Transaction_ID|                Date|Quantity|        Unit_Price|       Total_Cost|Discount_Rate|Sales_Channel|
+--------------+--------------------+--------+------------------+-----------------+-------------+-------------+
|             0|2020-01-01 20:32:...|       3|120.43682132331526| 8265.37454940804|          0.2|       Online|
|             1|2020-01-02 06:55:...|       6| 475.7249935818949|  4047.8504787607|          0.0|     Reseller|
|             2|2020-01-06 08:12:...|       3|146.40055601470172|              0.0|         0.05|       Direct|
|             3|2020-01-07 22:03:...|       6|19.373193554270635|8194.281992877543|          0.0|     Reseller|
|             4|2020-01-07 07:08:...|       8|193.22131303627867| 8331.32924886882|          0.2|       Direct|
+--------------+--------------------+--------+------------------+-----------------+-------------+-------

In [255]:
# # Transaction fact table
# transaction_fact = df_clean.join(customer, ['Region', 'Country', 'Customer_Name', 'Customer_Phone', 'Customer_Email', 'Customer_Address'], 'left') \
#                             .join(shipping, ['Warehouse_Code', 'Ship_Mode', 'Delivery_Status', 'Customer_Satisfaction', 'Item_Returned', 'Return_Reason'], 'left' ) \
#                             .join(product, ['Product_List_Title'], 'left') \
#                             .select('transaction_id', 'Date', 'Quantity', 'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel')
                            

# from pyspark.sql import functions as F

# df = df_clean.alias("df")
# c  = customer.alias("c")
# p  = product.alias("p")
# s  = shipping.alias("s")

# transaction_fact = (
#     df
#     df
#     .join(c, on=F.col("df.Customer_ID") == F.col("c.Customer_ID"), how="left")
#     # .join(c, on="Customer_ID", how="left")
#     .join(p, on="Product_ID", how="left")
#     .join(s, on="Shipping_ID", how="left")
#     .select(
#         F.col("df.Transaction_ID"),
#         F.col("df.Customer_ID"),
#         F.col("df.Product_ID"),
#         F.col("df.Shipping_ID"),
#         F.col("df.Date"),
#         F.col("df.Quantity"),
#         F.col("df.Unit_Price"),
#         F.col("df.Total_Cost"),
#         F.col("df.Discount_Rate"),
#         F.col("df.Sales_Channel")
#     )
# )




df = df_clean.alias("df")
c  = customer.alias("c")
p  = product.alias("p")
s  = shipping.alias("s")

transaction_fact = (
    df
    .join(c, df["Customer_ID"] == c["Customer_ID"], "left")
    .join(p, df["Product_ID"] == p["Product_ID"], "left")

    .select(
        [df[col] for col in ["Transaction_ID", "Customer_ID", "Product_ID", 
                             "Date", "Quantity", "Unit_Price", "Total_Cost", "Discount_Rate", "Sales_Channel"]]
    )
)



# transaction_fact.show(50, truncate=False)






In [256]:
from pyspark.sql.functions import col

transaction_fact_for_db = transaction_fact.select(
    col("Transaction_ID").alias("transaction_id"),
    col("Customer_ID").alias("customer_id"),
    col("Product_ID").alias("product_id"),
    col("Date"),
    col("Quantity").alias("quantity"),
    col("Unit_Price").alias("unit_price"),
    col("Total_Cost").alias("total_cost"),
    col("Discount_Rate").alias("discount_rate"),
    col("Sales_Channel").alias("sales_channel")
)

In [257]:
transaction_fact_for_db.show(50)

+--------------+-----------+----------+--------------------+--------+------------------+------------------+-------------+-------------+
|transaction_id|customer_id|product_id|                Date|quantity|        unit_price|        total_cost|discount_rate|sales_channel|
+--------------+-----------+----------+--------------------+--------+------------------+------------------+-------------+-------------+
|           200|       1086|       536|2020-01-01 20:32:...|       3|120.43682132331526|  8265.37454940804|          0.2|       Online|
|           321|       1078|       523|2020-01-02 06:55:...|       6| 475.7249935818949|   4047.8504787607|          0.0|     Reseller|
|           989|       1077|       535|2020-01-06 08:12:...|       3|146.40055601470172|               0.0|         0.05|       Direct|
|           682|       1027|       546|2020-01-07 22:03:...|       6|19.373193554270635| 8194.281992877543|          0.0|     Reseller|
|           484|       1052|       556|2020-01-0

### Troubleshoot Transaction fact table for null values

In [282]:
# Troubleshooting:  PROVE it (run this now) // show how many rows that is null the transaction_date column
from pyspark.sql.functions import col

transaction_fact.select(
    col("Transaction_ID"),
    col("Date")
).where(col("Date").isNull()).show(5)

+--------------+----+
|Transaction_ID|Date|
+--------------+----+
+--------------+----+



In [283]:
# BEST PRACTICE): DROP bad rows
# transaction has no date the one row is null so I'm dropping the very row

# transaction_fact_clean = transaction_fact.filter(col("Date").isNotNull())

transaction_fact_for_db = transaction_fact_for_db.na.drop(subset=["Date"])


# Alternatively, I can fill the role if business permits it:
# from pyspark.sql.functions import lit

# transaction_fact_clean = transaction_fact.withColumn(
#     "Date",
#     col("Date").otherwise(lit("1970-01-01"))
# )

In [284]:
# VERIFY before writing (mandatory) (Checking if the null column still exist)
transaction_fact_for_db.select("Date") \
    .where(col("Date").isNull()) \
    .count()

0

### Data Loading 

In [271]:
# Connection
def get_db_connection():
    connection = psycopg2.connect(
        host='localhost',
        database = 'nuga_bank_db',
        user= 'postgres',
        password='password'
    )
    return connection

#connect to sql database
# conn = get_db_connection()

In [275]:
# Create a function to create tables
def create_tables():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''
                        DROP TABLE IF EXISTS customer;
                        DROP TABLE IF EXISTS product;
                        DROP TABLE IF EXISTS shipping;
                        DROP TABLE IF EXISTS transaction_fact;

                        CREATE TABLE customer (
                            Customer_id BIGINT PRIMARY KEY,
                            Region VARCHAR(100),
                            Country VARCHAR(100),
                            Customer_Name VARCHAR(200),
                            Customer_Phone VARCHAR(50),
                            Customer_Email VARCHAR(200),
                            Customer_Address VARCHAR(1000)

                        );

                        CREATE TABLE product (
                            product_id BIGINT PRIMARY KEY,
                            Product_List_Title VARCHAR(1000)
                        );

                        CREATE TABLE shipping (
                            shipping_id BIGINT PRIMARY KEY,
                            Warehouse_Code VARCHAR(200),
                            Ship_Mode VARCHAR (200),
                            Delivery_Status VARCHAR(200),
                            Customer_Satisfaction VARCHAR(200),
                            Item_Returned BOOLEAN,
                            Return_Reason TEXT

                        );

                        CREATE TABLE transaction_fact(
                            Transaction_ID BIGINT PRIMARY KEY,
                            Customer_ID BIGINT NOT NULL,
                            Product_ID BIGINT NOT NULL,
                            Date Timestamp,
                            Quantity INT,
                            Unit_Price NUMERIC(12,2),
                            Total_Cost NUMERIC(14,2),
                            Discount_Rate NUMERIC(5,4),
                            Sales_Channel VARCHAR(1000)


                        );


                        '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()

In [276]:
create_tables()

In [None]:
# customer.printSchema()
# product.printSchema()
# shipping.printSchema()
# transaction_fact.printSchema()


In [278]:
# LOAD DATA TO TABLE
url = "jdbc:postgresql://localhost:5432/nuga_bank_db"
properties = {
    "user" : "postgres",
    "password" : "password",
    "driver" : "org.postgresql.Driver"
}




In [279]:
customer.write.jdbc(url=url, table="customer", mode="append",  properties=properties)
product.write.jdbc(url=url, table="product", mode="append", properties=properties)
shipping.write.jdbc(url=url, table="shipping", mode="append", properties=properties)

In [281]:
transaction_fact_for_db.write.jdbc(url=url, table="transaction_fact", mode="append", properties=properties)
# transaction_fact_for_db.write.jdbc(
#     url=url,
#     table="transaction_fact",
#     mode="append",
#     properties=properties
# )