### Import needed packages

In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, regexp_replace, desc, count, expr, sum as spark_sum , split, when, isnan
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
import subprocess

### Set up Spark Session & Read the Data

In [2]:
spark = SparkSession.builder \
    .appName("RetailDataProcessing") \
    .master("local[*]")\
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

load_time = "2024-06-30-08"
    
branches_df = spark.read.csv(
                    f"/Spark_Project/data/Q_company/branches/{load_time}",
                    header=True, 
                    inferSchema=True)
sales_agents_df = spark.read.csv(
                        f"/Spark_Project/data/Q_company/sales_agents/{load_time}", 
                        header=True, 
                        inferSchema=True)
transactions_df = spark.read.csv(
                        f"/Spark_Project/data/Q_company/sales_transactions/{load_time}",
                        header=True,
                        inferSchema=True)

### EDA and Data Cleaning for each DataFrame

Before stepping into data cleansing we need to know our data.

### Sales Transactions

In [3]:
transactions_df

transaction_date,transaction_id,customer_id,customer_fname,cusomter_lname,cusomter_email,sales_agent_id,branch_id,product_id,product_name,product_category,offer_1,offer_2,offer_3,offer_4,offer_5,units,unit_price,is_online,payment_method,shipping_address,load_time,source,source_path,group
2023-05-20 00:00:00,trx-152546429674,85469,Alexander,Brown,alexander.brown@g...,1.0,2.0,22,Coffee Maker,Appliances,,,,,,10,79.99,no,Cash,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2022-10-25 00:00:00,trx-291375327542,85512,William,Brown,william.brown@gma...,3.0,1.0,24,Blender,Appliances,,,,True,,5,49.99,no,Cash,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2022-02-05 00:00:00,trx-312507679871,85484,John,Williams,john.williams@gma...,10.0,3.0,4,Headphones,Electronics,,,,,,1,99.99,no,Credit Card,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2023-10-20 00:00:00,trx-193384855491,85528,Alexander,Miller,alexander.miller@...,7.0,2.0,25,Washing Machine,Appliances,,,,,,8,499.99,no,Cash,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2022-11-17 00:00:00,trx-831626097654,85500,John,Brown,john.brown@hotmai...,5.0,1.0,14,Camera,Electronics,,,True,,,10,399.99,no,Cash,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2022-09-27 00:00:00,trx-158496122054,85545,Sophia,Wilson,sophia.wilson@hot...,4.0,5.0,14,Camera,Electronics,,,,,True,6,399.99,no,Credit Card,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2022-04-21 00:00:00,trx-722817999024,85561,Alexander,Moore,alexander.moore@y...,4.0,1.0,30,Electric Kettle,Appliances,,,,True,,6,24.99,no,Credit Card,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2023-04-28 00:00:00,trx-813287633702,85520,Alexander,Wilson,alexander.wilson@...,1.0,1.0,26,Vacuum Cleaner,Appliances,,,,,,4,199.99,no,Cash,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2023-03-08 00:00:00,trx-219568257432,85488,Michael,Miller,michael.miller@ya...,6.0,2.0,18,Boots,Footwear,,,,,,10,149.99,no,Credit Card,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2023-06-17 00:00:00,trx-352160720823,85466,Michael,Brown,michael.brown@yah...,5.0,2.0,16,Skirt,Clothing,,,,,,8,39.99,no,Cash,,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1


In [4]:
transactions_df.dtypes

[('transaction_date', 'timestamp'),
 ('transaction_id', 'string'),
 ('customer_id', 'int'),
 ('customer_fname', 'string'),
 ('cusomter_lname', 'string'),
 ('cusomter_email', 'string'),
 ('sales_agent_id', 'double'),
 ('branch_id', 'double'),
 ('product_id', 'int'),
 ('product_name', 'string'),
 ('product_category', 'string'),
 ('offer_1', 'boolean'),
 ('offer_2', 'boolean'),
 ('offer_3', 'boolean'),
 ('offer_4', 'boolean'),
 ('offer_5', 'boolean'),
 ('units', 'int'),
 ('unit_price', 'double'),
 ('is_online', 'string'),
 ('payment_method', 'string'),
 ('shipping_address', 'string'),
 ('load_time', 'timestamp'),
 ('source', 'string'),
 ('source_path', 'string'),
 ('group', 'string')]

Check for Nulls

In [5]:
expressions = []

for col_name, data_type in transactions_df.dtypes:
    if data_type in ['int', 'bigint', 'float', 'double', 'decimal']:
        # Check for both NaN and null values in numeric columns
        expressions.append(count(when(col(col_name).isNull() | isnan(col(col_name)), col_name)).alias(col_name))
    else:
        # Check only for null values in other columns
        expressions.append(count(when(col(col_name).isNull(), col_name)).alias(col_name))

# Count null and NaN values in each column
transactions_df.select(expressions)

transaction_date,transaction_id,customer_id,customer_fname,cusomter_lname,cusomter_email,sales_agent_id,branch_id,product_id,product_name,product_category,offer_1,offer_2,offer_3,offer_4,offer_5,units,unit_price,is_online,payment_method,shipping_address,load_time,source,source_path,group
0,0,0,0,0,0,500,500,0,0,0,1347,1356,1332,1355,1374,0,0,0,0,1000,0,0,0,0


In [6]:
transactions_df.describe()

summary,transaction_id,customer_id,customer_fname,cusomter_lname,cusomter_email,sales_agent_id,branch_id,product_id,product_name,product_category,units,unit_price,is_online,payment_method,shipping_address,source,source_path,group
count,1500,1500.0,1500,1500,1500,1000.0,1000.0,1500.0,1500,1500,1500.0,1500.0,1500,1500,500,1500,1500,1500
mean,,85511.282,,,,5.592,3.027,15.678,,,5.422666666666666,178.61333333332968,,,,,,
stddev,,28.880556325824408,,,,2.933622699350095,1.4492658042510256,8.706138479916778,,,2.8213459348589245,239.83557323789387,,,,,,
min,trx-000112360946,85462.0,Alexander,Brown,alexander.brown@g...,1.0,1.0,1.0,Blender,Appliances,1.0,19.99,no,Cash,10 Dalry Lane/Sav...,Local File System,/data/Spark_proje...,group1
max,trx-999392026173,85562.0,William,Wilson,william.wilson@ya...,10.0,5.0,30.0,Washing Machine,Footwear,10.0,999.99,yes,Stripe,Tyndall Drive/Tyn...,Local File System,/data/Spark_proje...,group1


Check that each offer only applies a single offer at most

In [7]:
columns = ["offer_1", "offer_2", "offer_3", "offer_4", "offer_5"]
offers_count_expr = sum(when(col(c) == True, 1).otherwise(0) for c in columns).alias("offers")
transactions_df.withColumn("Offers", 
                           offers_count_expr).where("offers > 1")

transaction_date,transaction_id,customer_id,customer_fname,cusomter_lname,cusomter_email,sales_agent_id,branch_id,product_id,product_name,product_category,offer_1,offer_2,offer_3,offer_4,offer_5,units,unit_price,is_online,payment_method,shipping_address,load_time,source,source_path,group,Offers


### Data Cleansing

Repartition the DataFrame

In [8]:
transactions_df.rdd.getNumPartitions()

1

In [9]:
transactions_df = transactions_df.withColumn("transaction_date", col("transaction_date").cast("date"))
transactions_df = transactions_df.repartition(10)
transactions_df.rdd.getNumPartitions()

10

Split the location column

In [10]:
transactions_df = transactions_df.withColumn("Street", split(col("shipping_address"), "/").getItem(0))
transactions_df = transactions_df.withColumn("City", split(col("shipping_address"), "/").getItem(1))
transactions_df = transactions_df.withColumn("State", split(col("shipping_address"), "/").getItem(2))
transactions_df = transactions_df.withColumn("Zip_Code", split(col("shipping_address"), "/").getItem(3))
transactions_df.select("shipping_address", "city", "Street", "State", "Zip_Code").show(16, truncate=False)

+--------------------------------------------------+------------+------------------------------+-----+--------+
|shipping_address                                  |city        |Street                        |State|Zip_Code|
+--------------------------------------------------+------------+------------------------------+-----+--------+
|5403 Illinois Avenue/Nashville/TN/37209           |Nashville   |5403 Illinois Avenue          |TN   |37209   |
|113 Hammarlee Road/Glen Burnie/MD/21060           |Glen Burnie |113 Hammarlee Road            |MD   |21060   |
|8376 Albacore Drive/Pasadena/MD/21122             |Pasadena    |8376 Albacore Drive           |MD   |21122   |
|2704 McGee Avenue/Berkeley/CA/94703               |Berkeley    |2704 McGee Avenue             |CA   |94703   |
|2906 Clare Avenue/Nashville/TN/37209              |Nashville   |2906 Clare Avenue             |TN   |37209   |
|8188 Poinsett Terrace/Pasadena/MD/21122           |Pasadena    |8188 Poinsett Terrace         |MD   |21

In [11]:
transactions_df.columns

['transaction_date',
 'transaction_id',
 'customer_id',
 'customer_fname',
 'cusomter_lname',
 'cusomter_email',
 'sales_agent_id',
 'branch_id',
 'product_id',
 'product_name',
 'product_category',
 'offer_1',
 'offer_2',
 'offer_3',
 'offer_4',
 'offer_5',
 'units',
 'unit_price',
 'is_online',
 'payment_method',
 'shipping_address',
 'load_time',
 'source',
 'source_path',
 'group',
 'Street',
 'City',
 'State',
 'Zip_Code']

Clean up Emails

In [12]:
transactions_df = transactions_df.withColumn("customer_email",
    regexp_replace(regexp_extract(col("cusomter_email"), r'^([^@\s]+@[^\s]+\.(com|org|net))', 1), r'[^\w.@-]', ''))

transactions_df = transactions_df.withColumnRenamed("customer_email", "email")

transactions_df.select("cusomter_email", "email").show(5, truncate=False)


+-----------------------------+----------------------------+
|cusomter_email               |email                       |
+-----------------------------+----------------------------+
|mia.johnson@hotmail.com]x    |mia.johnson@hotmail.com     |
|william.brown@gmail.com[     |william.brown@gmail.com     |
|mia.davis@gmail.com/         |mia.davis@gmail.com         |
|alexander.wilson@hotmail.com(|alexander.wilson@hotmail.com|
|william.brown@gmail.com]x    |william.brown@gmail.com     |
+-----------------------------+----------------------------+
only showing top 5 rows



Change the data type for is online column to boolean

In [13]:
transactions_df = transactions_df.withColumn("Online", 
                                            expr(
                                                """
                                            CASE
                                                WHEN is_online = 'yes' THEN True
                                                ELSE False
                                            END
                                        """))

Define Product's Offer and Discount

In [14]:
transactions_df = transactions_df.withColumn("Offer",
    expr("""
        CASE
            WHEN offer_1 = 'TRUE' THEN 'Offer 1'
            WHEN offer_2 = 'TRUE' THEN 'Offer 2'
            WHEN offer_3 = 'TRUE' THEN 'Offer 3'
            WHEN offer_4 = 'TRUE' THEN 'Offer 4'
            WHEN offer_5 = 'TRUE' THEN 'Offer 5'
            ELSE 'No Offers'
        END
    """)).withColumn("Discount",
            expr("""
            CASE
            WHEN offer_1 = 'TRUE' THEN 5
            WHEN offer_2 = 'TRUE' THEN 10
            WHEN offer_3 = 'TRUE' THEN 15
            WHEN offer_4 = 'TRUE' THEN 20
            WHEN offer_5 = 'TRUE' THEN 25
            ELSE 0
        END
    """))

Calculate Final Price

In [15]:
transactions_df = transactions_df.withColumn("Final_Price",
                                             expr("""
                                                  units * unit_price * (100 - Discount) / 100
                                                  """))
transactions_df.select("transaction_id", "Discount", "units", "unit_price", "Final_Price").show(truncate=False)

+----------------+--------+-----+----------+------------------+
|transaction_id  |Discount|units|unit_price|Final_Price       |
+----------------+--------+-----+----------+------------------+
|trx-322304849280|0       |8    |299.99    |2399.92           |
|trx-480600454996|0       |9    |19.99     |179.91            |
|trx-223446861036|0       |1    |999.99    |999.99            |
|trx-471827640530|5       |5    |59.99     |284.9525          |
|trx-197135297075|0       |7    |79.99     |559.93            |
|trx-891395193336|20      |5    |29.99     |119.96            |
|trx-477332710198|5       |9    |149.99    |1282.4145         |
|trx-540337110687|0       |10   |129.99    |1299.9            |
|trx-091709503415|5       |4    |49.99     |189.96200000000002|
|trx-783039674351|10      |3    |299.99    |809.9730000000001 |
|trx-895867959806|5       |8    |499.99    |3799.9240000000004|
|trx-988671840285|25      |1    |29.99     |22.4925           |
|trx-168539426259|0       |3    |59.99  

### Branches

In [16]:
branches_df

branch_id,location,establish_date,class,load_time,source,source_path,group
1,New York,2017-01-15 00:00:00,A,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2,Los Angeles,2016-07-28 00:00:00,B,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
3,Chicago,2015-03-10 00:00:00,A,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
4,Houston,2016-11-05 00:00:00,D,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
5,Phoenix,2017-09-20 00:00:00,C,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1


### Sales Agents

In [17]:
sales_agents_df

sales_person_id,name,hire_date,load_time,source,source_path,group
1,John Doe,2020-06-03 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
2,Jane Smith,2018-05-13 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
3,Michael Johnson,2021-10-03 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
4,Emily Brown,2020-10-25 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
5,David Wilson,2021-04-08 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
6,Emma Taylor,2019-03-28 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
7,Christopher Miller,2020-01-11 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
8,Olivia Davis,2021-10-24 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
9,Daniel Martinez,2018-10-08 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1
10,Sophia Moore,2019-05-25 00:00:00,2024-06-30 08:18:...,Local File System,/data/Spark_proje...,group1


### Preparing the DWH Model

![Sample Image](./image.png)


In [18]:
dim_agents = sales_agents_df.select("sales_person_id", "name", "hire_date") \
                            .withColumn("hire_date", col("hire_date").cast("date")) \
                            .withColumnRenamed("sales_person_id", "agent_id")

In [19]:
dim_branch = branches_df.select("branch_id", "location", "establish_date", "class") \
                         .withColumn("establish_date", col("establish_date").cast("date"))

In [20]:
dim_product = transactions_df.select('product_id', 'product_name', 'product_category').distinct()

In [21]:
schema = StructType([
    StructField("Offer_ID", IntegerType(), nullable=False),
    StructField("Offer_name", StringType(), nullable=False),
    StructField("Discount", FloatType(), nullable=False)
])

# Create the data
data = [
    (0, "No Offers", 0.0),
    (1, "Offer 1", 0.05),
    (2, "Offer 2", 0.10),
    (3, "Offer 3", 0.15),
    (4, "Offer 4", 0.20),
    (5, "Offer 5", 0.25)
]
dim_offer = spark.createDataFrame(data, schema)

In [22]:
dim_customer = transactions_df.select('customer_id', 'customer_fname', 'cusomter_lname', 'email').distinct()

Creating a Location Dimension

In [23]:
dim_location = transactions_df.select('Zip_Code', 'City', 'Street', 'State').distinct()

Insert a new Location Indicating a Branch (Non-Online) Purchase

In [24]:
schema = dim_location.schema
online_purchase = [("00000", "None", "None", "Online")]
online_purchase = spark.createDataFrame(online_purchase, schema)
dim_location = dim_location.union(online_purchase)

#### Preparing the fact table

In [25]:
transactions_df = transactions_df.withColumn("Offer_ID",
                                             expr(
                                             """
                                            CASE
                                                WHEN Offer = 'Offer 1' THEN 1
                                                WHEN Offer = 'Offer 2' THEN 2
                                                WHEN Offer = 'Offer 3' THEN 3
                                                WHEN Offer = 'Offer 4' THEN 4
                                                WHEN Offer = 'Offer 5' THEN 5
                                                ELSE 0
                                            END
                                             """))

In [26]:
transactions_df = transactions_df.select(
                     'transaction_date',
                     'transaction_id',
                     'payment_method',
                     'Online',
                     'customer_id',
                     'sales_agent_id',
                     'branch_id',
                     'product_id',
                     'Offer_ID',
                     'Zip_Code',
                     'units',
                     'unit_price',
                     'Final_Price'
)

In [31]:
transactions_df = transactions_df.withColumn("Discount",
                                             expr("""
                                             unit_price * units - Final_Price
                                                 """)).fillna({"Zip_Code": "00000"})

<hr>

### Insert Data into Hive

#### Set up tables for Initial Load

In [188]:
def create_hdfs_directory(path): 
      subprocess.run(["hdfs", "dfs", "-mkdir", "-p", path])

create_hdfs_directory("/user/hive/warehouse/external")

In [189]:
tables = {'dim_product': dim_product,
                   'dim_location': dim_location,
                   'dim_customer': dim_customer,
                   'dim_offer': dim_offer,
                   'dim_branch': dim_branch,
                   'dim_agents': dim_agents,
                   'transactions_fact': transactions_df}
keys = {'dim_product': "product_id",
                   'dim_location': "zip_code",
                   'dim_customer': "customer_id",
                   'dim_offer': "Offer_ID",
                   'dim_branch': "branch_id",
                   'dim_agents': "agent_id"}

In [190]:
def SCD_T1(table_name, dataframe):
    dataframe.registerTempTable(f"temp_{table_name}")
    return spark.sql(f"""
                SELECT OT.* from {table_name} OT RIGHT OUTER join temp_{table_name} TT
                on OT.{keys[table_name]} = TT.{keys[table_name]}
                WHERE OT.{keys[table_name]} IS NULL
                  """)

In [193]:
for table in tables:
    if not spark.sql(f"SHOW TABLES IN default LIKE '{table}'").count():
        tables[table].write.mode("overwrite").option(
        "path", f"/user/hive/warehouse/external/{table}"
        ).saveAsTable(f"default.{table}")
    else:
        if table == 'transactions_fact':
            transactions_df.write.mode("append").option(
                          "path", f"/user/hive/warehouse/external/transactions_fact"
                           ).saveAsTable(f"default.transactions_fact")
        else:
            df = SCD_T1(table, tables[table])
            df.write.mode("append").option(
                          "path", f"/user/hive/warehouse/external/{table}"
                           ).saveAsTable(f"default.{table}")

<hr>