In [1]:
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import col, monotonically_increasing_id, row_number, current_timestamp, concat_ws, to_timestamp

In [2]:
spark = SparkSession.builder.master("local[*]") \
    .appName("Read_Silver_Layer") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioLocalAccessKey") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioLocalSecretKey123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

for jar in os.listdir("/home/jovyan/jars"):
    if jar.endswith(".jar"):
        spark.sparkContext.addJar(f"/home/jovyan/jars/{jar}")
spark

In [3]:
raw_data = spark.read.parquet("s3a://etl-dag/bronze/data/retail_data_parquet/",
                            header=True, inferSchema=True)

print(f"Total records: {raw_data.count()}")

Total records: 302010


In [4]:
raw_data.columns

['Transaction_ID',
 'Customer_ID',
 'Name',
 'Email',
 'Phone',
 'Address',
 'City',
 'Zipcode',
 'Country',
 'Age',
 'Gender',
 'Income',
 'Customer_Segment',
 'Date',
 'Year',
 'Month',
 'Time',
 'Total_Purchases',
 'Amount',
 'Total_Amount',
 'Product_Category',
 'Product_Brand',
 'Product_Type',
 'Feedback',
 'Shipping_Method',
 'Payment_Method',
 'Order_Status',
 'Ratings',
 'products',
 'State/Province']

## Lookup Tables

### 1. Countries Table

In [5]:
countries = raw_data.select("Country").where(col("Country").isNotNull()).distinct().rdd.flatMap(lambda row: row).collect()
countries_with_id = [Row(Country_ID=i+1, Country_Name=country) for i, country in enumerate(countries)]

countries_df = spark.createDataFrame(countries_with_id)
countries_df.show()

+----------+------------+
|Country_ID|Country_Name|
+----------+------------+
|         1|     Germany|
|         2|         USA|
|         3|          UK|
|         4|      Canada|
|         5|   Australia|
+----------+------------+



### 2. States/Provinces Table

In [6]:
states_data = raw_data.select("State/Province", "Country").dropna().distinct()
states_data = states_data.join(
    countries_df,
    states_data["Country"] == countries_df["Country_Name"],
    how="inner"
)

states_provinces_df = states_data.select(
    row_number().over(Window.orderBy("State/Province")).alias("State_ID"),
    col("State/Province").alias("State_Name"),
    col("Country_ID")
)

states_provinces_df.show()

+--------+--------------------+----------+
|State_ID|          State_Name|Country_ID|
+--------+--------------------+----------+
|       1|             Alberta|         4|
|       2|             Arizona|         2|
|       3|Australian Capita...|         5|
|       4|   Baden-Württemberg|         1|
|       5|             Bavaria|         1|
|       6|              Berlin|         1|
|       7|              Bremen|         1|
|       8|    British Columbia|         4|
|       9|          California|         2|
|      10|            Colorado|         2|
|      11|District of Columbia|         2|
|      12|         East Sussex|         3|
|      13|             England|         3|
|      14|             England|         4|
|      15|             England|         5|
|      16|             Florida|         2|
|      17|             Georgia|         2|
|      18|             Hamburg|         1|
|      19|               Hesse|         1|
|      20|            Illinois|         2|
+--------+-

### 3. Cities Table

In [7]:
cities_data = raw_data.select("City", "State/Province").dropna().distinct()

cities_data = cities_data.join(
    states_provinces_df,
    cities_data['State/Province'] == states_provinces_df['State_Name'],
    how="inner"
)

cities_df = cities_data.select(
    row_number().over(Window.orderBy("City")).alias("City_ID"),
    col("City").alias("City_Name"),
    col("State_ID")
)

cities_df.show()

+-------+--------------+--------+
|City_ID|     City_Name|State_ID|
+-------+--------------+--------+
|      1|      Adelaide|      54|
|      2|   Albuquerque|      34|
|      3|Albury-Wodonga|      36|
|      4|     Arlington|      57|
|      5|       Atlanta|      17|
|      6|        Austin|      57|
|      7|      Ballarat|      58|
|      8|     Baltimore|      27|
|      9|        Barrie|      46|
|     10|       Belfast|      41|
|     11|       Bendigo|      58|
|     12|        Berlin|       6|
|     13|     Bielefeld|      40|
|     14|    Birmingham|      13|
|     15|    Birmingham|      14|
|     16|    Birmingham|      15|
|     17|        Bochum|      40|
|     18|          Bonn|      40|
|     19|        Boston|      28|
|     20|        Bremen|       7|
+-------+--------------+--------+
only showing top 20 rows



### 4. Categories Table

In [8]:
categories = raw_data.select("Product_Category").dropna().distinct().rdd.flatMap(lambda row: row).collect()
categories_with_id = [Row(Category_ID=i+1, Category_Name=category) for i, category in enumerate(categories)]

categories_df = spark.createDataFrame(categories_with_id)
categories_df.show(n=categories_df.count())

+-----------+-------------+
|Category_ID|Category_Name|
+-----------+-------------+
|          1|      Grocery|
|          2|  Electronics|
|          3|     Clothing|
|          4|        Books|
|          5|   Home Decor|
+-----------+-------------+



### 5. Brands Table

In [9]:
brands = raw_data.select("Product_Brand").dropna().distinct().rdd.flatMap(lambda row: row).collect()
brands_with_id = [Row(Brand_ID=i+1, Brand_Name=brand) for i, brand in enumerate(brands)]

brands_df = spark.createDataFrame(brands_with_id)
brands_df.show(n=brands_df.count())

+--------+-----------------+
|Brand_ID|       Brand_Name|
+--------+-----------------+
|       1|             Nike|
|       2|    HarperCollins|
|       3|             Sony|
|       4|        Whirepool|
|       5|             Zara|
|       6|         BlueStar|
|       7|     Random House|
|       8|          Samsung|
|       9|        Coca-Cola|
|      10|       Home Depot|
|      11|            Pepsi|
|      12|Bed Bath & Beyond|
|      13|    Penguin Books|
|      14|       Mitsubhisi|
|      15|           Nestle|
|      16|            Apple|
|      17|           Adidas|
|      18|             IKEA|
+--------+-----------------+



### 6. Product Types Table

In [10]:
products_data = raw_data.select("Product_Type", "Product_Category").dropna().distinct()
products_data = products_data.join(categories_df, products_data['Product_Category'] == categories_df['Category_Name'], how="inner")

products_df = products_data.select(
    row_number().over(Window.orderBy("Product_Type")).alias("Product_Type_ID"),
    col("Product_Type").alias("Product_Type_Name"),
    col("Category_ID")
)

products_df.show()

+---------------+--------------------+-----------+
|Product_Type_ID|   Product_Type_Name|Category_ID|
+---------------+--------------------+-----------+
|              1|            Bathroom|          5|
|              2|             Bedding|          5|
|              3|         BlueStar AC|          2|
|              4|          Children's|          4|
|              5|           Chocolate|          1|
|              6|              Coffee|          1|
|              7|         Decorations|          5|
|              8|               Dress|          3|
|              9|             Fiction|          4|
|             10|              Fridge|          2|
|             11|           Furniture|          5|
|             12|          Headphones|          2|
|             13|              Jacket|          3|
|             14|               Jeans|          3|
|             15|               Juice|          1|
|             16|             Kitchen|          5|
|             17|              

### 7. Customer Segments Table

In [11]:
segments = raw_data.select("Customer_Segment").dropna().distinct().rdd.flatMap(lambda row: row).collect()
segments_with_id = [Row(Segment_ID=i+1, Segment_Name=segment) for i, segment in enumerate(segments)]

segments_df = spark.createDataFrame(segments_with_id)
segments_df.show(n=segments_df.count())

+----------+------------+
|Segment_ID|Segment_Name|
+----------+------------+
|         1|     Premium|
|         2|     Regular|
|         3|         New|
+----------+------------+



### 8. Income Levels Table

In [12]:
incomes = raw_data.select("Income").dropna().distinct().rdd.flatMap(lambda row: row).collect()
incomes_with_id = [Row(Income_Level_ID=i+1, Income_Level=income) for i, income in enumerate(incomes)]

incomes_df = spark.createDataFrame(incomes_with_id)
incomes_df.show(n=incomes_df.count())

+---------------+------------+
|Income_Level_ID|Income_Level|
+---------------+------------+
|              1|        High|
|              2|         Low|
|              3|      Medium|
+---------------+------------+



### 9. Shipping Methods Table

In [13]:
shippings = raw_data.select("Shipping_Method").dropna().distinct().rdd.flatMap(lambda row: row).collect()
shippings_with_id = [Row(Shipping_Method_ID=i+1, Method_Name=shipping) for i, shipping in enumerate(shippings)]

shippings_df = spark.createDataFrame(shippings_with_id)
shippings_df.show(n=shippings_df.count())

+------------------+-----------+
|Shipping_Method_ID|Method_Name|
+------------------+-----------+
|                 1|    Express|
|                 2|   Standard|
|                 3|   Same-Day|
+------------------+-----------+



### 10. Payment Methods Table

In [14]:
payments = raw_data.select("Payment_Method").dropna().distinct().rdd.flatMap(lambda row: row).collect()
payments_with_id = [Row(Payment_Method_ID=i+1, Method_Name=payment) for i, payment in enumerate(payments)]

payments_df = spark.createDataFrame(payments_with_id)
payments_df.show(n=payments_df.count())

+-----------------+-----------+
|Payment_Method_ID|Method_Name|
+-----------------+-----------+
|                1|Credit Card|
|                2|     PayPal|
|                3|       Cash|
|                4| Debit Card|
+-----------------+-----------+



### 11. Order Statuses Table

In [15]:
orders = raw_data.select("Order_Status").dropna().distinct().rdd.flatMap(lambda row: row).collect()
orders_with_id = [Row(Status_ID=i+1, Status_Name=order) for i, order in enumerate(orders)]

orders_df = spark.createDataFrame(orders_with_id)
orders_df.show(n=orders_df.count())

+---------+-----------+
|Status_ID|Status_Name|
+---------+-----------+
|        1|    Shipped|
|        2| Processing|
|        3|  Delivered|
|        4|    Pending|
+---------+-----------+



## Core Entity Tables

### 1. Customers Table

In [16]:
customers_base = raw_data.select('Customer_ID', 'Name', 'Email', 'Phone', 'Age','Gender', 'Income', 'Customer_Segment')

customers_base = customers_base.join(incomes_df, customers_base["Income"] == incomes_df["Income_Level"], how="left")
customers_base = customers_base.join(segments_df, customers_base["Customer_Segment"] == segments_df["Segment_Name"], how="left")

customers_base.select('Customer_ID').dropna().rdd.flatMap(lambda row: row).collect()

customers_df = customers_base.select(
    col("Customer_ID").cast("int"),
    col("Name"),
    col("Email"),
    col("Phone").cast("string"),
    col("Age").cast("int"),
    col("Gender"),
    col("Income_Level_ID"),
    col("Segment_ID").alias("Customer_Segment_ID"),
    current_timestamp().alias("Created_Date"),
    current_timestamp().alias("Updated_Date")
)
customers_df.show()

+-----------+------------------+--------------------+-------------+---+------+---------------+-------------------+--------------------+--------------------+
|Customer_ID|              Name|               Email|        Phone|Age|Gender|Income_Level_ID|Customer_Segment_ID|        Created_Date|        Updated_Date|
+-----------+------------------+--------------------+-------------+---+------+---------------+-------------------+--------------------+--------------------+
|      95631|      Brandon Soto|  Shannon6@gmail.com|4.609969429E9| 45|  Male|              1|                  1|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      15275|     Robert Lester|Christopher34@gma...| 1.56863877E9| 23|Female|              2|                  1|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      59319|      Thomas Brady|  Cheryl24@gmail.com|2.698304078E9| 63|  Male|              2|                  1|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      27206|      Karen Larson| Kenneth84@gmail.com|6.708

### 2. Products Table

In [17]:
products_base = raw_data.select('products', 'Product_Type','Product_Brand', 'Product_Category')

products_base = products_base.join(products_df, products_base['Product_Type'] == products_df['Product_Type_Name'], how='left')
products_base = products_base.join(brands_df, products_base['Product_Brand'] == brands_df['Brand_Name'], how='left')
products_base = products_base.join(categories_df.select("Category_Name"), products_base['Product_Category'] == categories_df['Category_Name'], how='left')

products_df = products_base.select(
    col("products").alias("Product_Name"),
    col("Product_Type_ID"),
    col("Brand_ID"),
    col("Category_ID"),
    current_timestamp().alias("Created_Date"),
    current_timestamp().alias("Updated_Date")
).withColumn("Product_ID", row_number().over(Window.orderBy("Product_Name")))

products_df.show()


+------------+---------------+--------+-----------+--------------------+--------------------+----------+
|Product_Name|Product_Type_ID|Brand_ID|Category_ID|        Created_Date|        Updated_Date|Product_ID|
+------------+---------------+--------+-----------+--------------------+--------------------+----------+
|       4K TV|             30|       3|          2|2025-05-29 14:14:...|2025-05-29 14:14:...|         1|
|       4K TV|             30|       8|          2|2025-05-29 14:14:...|2025-05-29 14:14:...|         2|
|       4K TV|             30|       8|          2|2025-05-29 14:14:...|2025-05-29 14:14:...|         3|
|       4K TV|             30|       8|          2|2025-05-29 14:14:...|2025-05-29 14:14:...|         4|
|       4K TV|             30|       3|          2|2025-05-29 14:14:...|2025-05-29 14:14:...|         5|
|       4K TV|             30|       8|          2|2025-05-29 14:14:...|2025-05-29 14:14:...|         6|
|       4K TV|             30|       3|          2|2025

### 3. Addresses Table

In [18]:
addresses_base = raw_data.select('Customer_ID', 'Address', 'City', 'State/Province', 'Zipcode', 'Country')
addresses_base = addresses_base.join(cities_df, addresses_base['City'] == cities_df['City_Name'], how='left')

addresses_df = addresses_base.select(
    col("Customer_ID").cast("int"),
    col("Address").alias("Street_Address"),
    col("City_ID"),
    col("Zipcode").cast("int"),
    current_timestamp().alias("Created_Date"),
    current_timestamp().alias("Updated_Date")
)

addresses_df.show()

+-----------+--------------------+-------+-------+--------------------+--------------------+
|Customer_ID|      Street_Address|City_ID|Zipcode|        Created_Date|        Updated_Date|
+-----------+--------------------+-------+-------+--------------------+--------------------+
|      22761|20065 Miller Junc...|    114|  33987|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      56342|26408 Kramer Over...|    162|  57214|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      15275|017 Rich Valleys ...|     12|  36597|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      27206|    40932 Anna Curve|    113|  32289|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      41457|    92812 Yates Fort|     22|  55112|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      80782|9115 Leah Haven A...|     16|  76288|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      80782|9115 Leah Haven A...|     15|  76288|2025-05-29 14:14:...|2025-05-29 14:14:...|
|      80782|9115 Leah Haven A...|     14|  76288|2025-05-29 14:14:...

### 4. Transactions Table

In [19]:
transactions_base = raw_data

transactions_base = transactions_base.join(shippings_df, transactions_base['Shipping_Method'] == shippings_df['Method_Name'], how='left')
transactions_base = transactions_base.join(payments_df, transactions_base['Payment_Method'] == payments_df['Method_Name'], how='left')
transactions_base = transactions_base.join(orders_df, transactions_base['Order_Status'] == orders_df['Status_Name'], how='left')

transactions_df = transactions_base.select(
    col('Transaction_ID').cast("int"),
    col("Customer_ID").cast("int"),
    col('Time').alias("Transaction_DateTime"),
    col('Total_Amount'),
    col("Total_Purchases").cast("int"),
    col("Shipping_Method_ID"),
    col("Payment_Method_ID"),
    col("Status_ID").alias("Order_Status_ID"),
    current_timestamp().alias("Created_Date"),
    current_timestamp().alias("Updated_Date")
)

transactions_df.show()

+--------------+-----------+--------------------+------------+---------------+------------------+-----------------+---------------+--------------------+--------------------+
|Transaction_ID|Customer_ID|Transaction_DateTime|Total_Amount|Total_Purchases|Shipping_Method_ID|Payment_Method_ID|Order_Status_ID|        Created_Date|        Updated_Date|
+--------------+-----------+--------------------+------------+---------------+------------------+-----------------+---------------+--------------------+--------------------+
|       5258286|      56342| 2025-05-29 21:39:43| 2248.229499|              5|                 1|                1|              1|2025-05-29 14:14:...|2025-05-29 14:14:...|
|       9245021|      80715| 2025-05-29 19:19:18| 597.4694199|              5|                 1|                1|              1|2025-05-29 14:14:...|2025-05-29 14:14:...|
|       3907436|      56508| 2025-05-29 13:55:19| 496.5221187|              6|                 2|                1|              1