In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder \
    .appName("Ecommerce-ETL") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.memoryOverhead", "2g") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/02 13:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## **Extract Data From MinIO**

In [5]:
df_order                          = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_orders_dataset.csv")
df_customers                      = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_customers_dataset.csv")
df_order_items                    = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_order_items_dataset.csv")
df_order_reviews                  = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_order_reviews_dataset.csv")
df_order_payments                 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_order_payments_dataset.csv")
df_products                       = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_products_dataset.csv")
df_sellers                        = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_sellers_dataset.csv")
df_geolocation                    = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/olist_geolocation_dataset.csv")
df_product_category_translation   = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3a://ecommerce-raw/product_category_name_translation.csv")

                                                                                

## **Customers Cleaning**

In [None]:
df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [10]:

# ----------------------------------
# Step 1: remove duplicates based on customer_id
# ----------------------------------
customers_clean = df_customers.dropDuplicates(["customer_id"])

# ----------------------------------
# Step 2: validate zip (must have 5 digits) and state (must have 2 letters)
# ----------------------------------
customers_clean = customers_clean.filter(length("customer_zip_code_prefix") == 5)
customers_clean = customers_clean.filter(length("customer_state") == 2)

# ----------------------------------
# Step 3: basic cleaning for customer_city
# - lowercase
# - trim extra spaces
# - remove special characters
# ----------------------------------
customers_clean = customers_clean.withColumn(
    "customer_city",
    lower(trim(regexp_replace(col("customer_city"), "[^a-zA-ZÀ-ÿ ]", "")))
)

# ----------------------------------
# Step 4: fix common misspellings and inconsistent forms
# ----------------------------------
customers_clean = customers_clean.withColumn(
    "customer_city",
    regexp_replace(col("customer_city"), "\\bd['\\s]o\\b", " do ")
)

corrections = {
    "sao jorge d oeste": "sao jorge do oeste",
    "estrela d oeste": "estrela do oeste", 
    "palma d oeste": "palma do oeste",
    "sao jorge d'oeste": "sao jorge do oeste",
    "estrela d'oeste": "estrela do oeste",
    "palma d'oeste": "palma do oeste",
    "alvorada d'oeste": "alvorada do oeste",
    "diamante d'oeste": "diamante do oeste",
    "rancho alegre d'oeste": "rancho alegre do oeste",
    "bandeirantes d'oeste": "bandeirantes do oeste",
    "santa clara d'oeste": "santa clara do oeste",
    "santa rita d'oeste": "santa rita do oeste",
    "aparecida d'oeste": "aparecida do oeste",
    "palmeira d oeste": "palmeira do oeste",
    "palmeira d'oeste": "palmeira do oeste",
    'sao paulo - sp': 'sao paulo',
    'sao paulo sp': 'sao paulo',
    'sp': 'sao paulo',
    'sao  paulo': 'sao paulo',
    'sao pauo': 'sao paulo',
    'sao paulop': 'sao paulo',
    'sao paulo': 'sao paulo',
    'sbc': 'sao bernardo do campo',
    'sbc/sp': 'sao bernardo do campo',
    'riberao preto': 'ribeirao preto',
    'robeirao preto': 'ribeirao preto',
    'ribeirao pretp': 'ribeirao preto',
    'sando andre': 'santo andre',
    'ao bernardo do campo': 'sao bernardo do campo',
    'garulhos': 'guarulhos',
    'brasilia df': 'brasilia',
    'rio de janeiro / rio de janeiro': 'rio de janeiro',
    'rio de janeiro, rio de janeiro, brasil': 'rio de janeiro',
    'mogi das cruses': 'mogi das cruzes',
    'mogi das cruzes / sp': 'mogi das cruzes',
    'santo andre/sao paulo': 'santo andre',
    "santa barbara d´oeste": "santa barbara d'oeste",
    "santa barbara d oeste": "santa barbara d'oeste",
    'belo horizont': 'belo horizonte',
    'cascavael': 'cascavel',
    'floranopolis': 'florianopolis',
}

for wrong, correct in corrections.items():
    customers_clean = customers_clean.withColumn(
        "customer_city",
        when(col("customer_city") == wrong, correct).otherwise(col("customer_city"))
    )

# ----------------------------------
# Step 5: handle missing values (replace with placeholder)
# ----------------------------------
customers_clean = customers_clean.fillna({
    "customer_city": "unknown",
    "customer_state": "unknown",
    "customer_zip_code_prefix": "00000"
})

# ----------------------------------
# Step 6: drop duplicates again after corrections (based on unique customer)
# ----------------------------------
customers_clean = customers_clean.dropDuplicates(["customer_unique_id"])

# ----------------------------------
# Final check
# ----------------------------------
customers_clean.show(10, truncate=False)

                                                                                

+--------------------------------+--------------------------------+------------------------+-----------------------+--------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city          |customer_state|
+--------------------------------+--------------------------------+------------------------+-----------------------+--------------+
|2f29573c8cac5a7be11c5b649078f944|0006fdc98a402fceb4eb0ee528f6a8d4|29400                   |mimoso do sul          |ES            |
|40f0183f7439212e8ce53dc6103dfb2f|00090324bbad0e9342388303bb71ba0a|13054                   |campinas               |SP            |
|0e114b02a45c9876080f3a9a19f51c16|000c8bdb58a29e7115cfc257230fb21b|31555                   |belo horizonte         |MG            |
|064064dd94c43013786fc1e1a14d6374|00115fc7123b5310cf6d3a3aa932699e|71015                   |brasilia               |DF            |
|063a5ea8e53ff62d2547c8af92ba02b2|0023557a94bef0038066b5d1b3dc763e|15105    

## **Orders Cleaning**

In [12]:
df_order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [13]:
# =======================================================
# Step 1: Filter out invalid order statuses
# - "created" and "approved" are not actual completed orders
# - keep only meaningful statuses
# =======================================================
valid_statuses = ["delivered", "shipped", "invoiced", "processing", "canceled", "unavailable"]
orders_clean = df_order.filter(col("order_status").isin(valid_statuses))

# =======================================================
# Step 2: Calculate shipping_days
# (time between purchase and actual customer delivery)
# =======================================================
orders_clean = orders_clean.withColumn(
    "shipping_days",
    datediff("order_delivered_customer_date", "order_purchase_timestamp")
)

# Remove negative or unrealistic shipping days
orders_clean = orders_clean.withColumn(
    "shipping_days",
    when(col("shipping_days") < 0, None).otherwise(col("shipping_days"))
)

# =======================================================
# Step 3: Calculate delivery_delay
# (difference between actual delivery and estimated delivery date)
# =======================================================
orders_clean = orders_clean.withColumn(
    "delivery_delay",
    datediff("order_delivered_customer_date", "order_estimated_delivery_date")
)

# =======================================================
# Step 4: Add delivery_status flag
# - "on_time" if delivered on or before estimated date
# - "delayed" otherwise
# =======================================================
orders_clean = orders_clean.withColumn(
    "delivery_status",
    when(col("delivery_delay") <= 0, "on_time").otherwise("delayed")
)

# =======================================================
# Step 5: Ensure logical consistency in dates
# (delivered date must be >= purchase date OR null)
# =======================================================
orders_clean = orders_clean.filter(
    (col("order_delivered_customer_date").isNull()) |
    (col("order_delivered_customer_date") >= col("order_purchase_timestamp"))
)

# =======================================================
# Final check
# =======================================================
orders_clean.show(10, truncate=False)

                                                                                

+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------+---------------+
|order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|shipping_days|delivery_delay|delivery_status|
+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------+---------------+
|e481f51cbdc54678b7cc49136f2d6af7|9ef432eb6251297304e76186b10a928d|delivered   |2017-10-02 10:56:33     |2017-10-02 11:07:15|2017-10-04 19:55:00         |2017-10-10 21:25:13          |2017-10-18 00:00:00          |8     

In [15]:
orders_clean.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- shipping_days: integer (nullable = true)
 |-- delivery_delay: integer (nullable = true)
 |-- delivery_status: string (nullable = false)



## **Orders Items Cleaning**

In [16]:
df_order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [17]:
# =======================================================
# Step 1: Filter out invalid rows
# - keep only rows where price > 0
# - keep only rows where freight_value >= 0 (0 means free shipping, which is valid)
# =======================================================
order_items_clean = df_order_items.filter(
    (col("price") > 0) & (col("freight_value") >= 0)
)

# =======================================================
# Step 2: Remove duplicates
# - ensure each (order_id, order_item_id) pair is unique
# =======================================================
order_items_clean = order_items_clean.dropDuplicates(["order_id", "order_item_id"])

# =======================================================
# Step 3: Calculate financial metrics
# - item_revenue = price (value of the product itself)
# - shipping_revenue = freight_value (shipping cost charged to customer)
# - total_cost = price + freight_value (overall cost per item)
# =======================================================
order_items_clean = order_items_clean \
    .withColumn("item_revenue", col("price")) \
    .withColumn("shipping_revenue", col("freight_value")) \
    .withColumn("total_cost", expr("price + freight_value"))

# =======================================================
# Step 4: Add freight percentage
# - shows how much of the total cost is due to shipping
# =======================================================
order_items_clean = order_items_clean.withColumn(
    "freight_percentage",
    (col("freight_value") / col("total_cost"))
)

# =======================================================
# Final check
# =======================================================
order_items_clean.show(10, truncate=False)

                                                                                

+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+------------+----------------+------------------+-------------------+
|order_id                        |order_item_id|product_id                      |seller_id                       |shipping_limit_date|price|freight_value|item_revenue|shipping_revenue|total_cost        |freight_percentage |
+--------------------------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+------------+----------------+------------------+-------------------+
|00018f77f2f0320c557190d7a144bdd3|1            |e5f2d52b802189ee658865ca93d83a8f|dd7ddc04e1b6c2c614352b383efe2d36|2017-05-03 11:05:13|239.9|19.93        |239.9       |19.93           |259.83            |0.07670399876842551|
|00061f2a7bc09da83e415a52dc8a4af1|1            |d63c1011f49d98b976c352955b1c4bea|cc419e0650a3c5ba77189a1

In [18]:
order_items_clean.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- item_revenue: double (nullable = true)
 |-- shipping_revenue: double (nullable = true)
 |-- total_cost: double (nullable = true)
 |-- freight_percentage: double (nullable = true)



## **Orders Payments Cleaning**

In [20]:
df_order_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [22]:
# =======================================================
# Step 1: Filter invalid rows
# - remove any rows with payment_value <= 0 (not logical)
# =======================================================
order_payments_clean = df_order_payments.filter(col("payment_value") > 0)

# =======================================================
# Step 2: Aggregate payments at the order_id level
# - total_payment: sum of all payment values for the order
# - total_installments: total number of installments
# - payment_methods_count: how many different payment types were used
# - main_payment_type: the first payment type recorded (proxy for primary method)
# =======================================================
order_payments_clean = order_payments_clean.groupBy("order_id").agg(
    sum("payment_value").alias("total_payment"),
    sum("payment_installments").alias("total_installments"),
    countDistinct("payment_type").alias("payment_methods_count"),
    first("payment_type").alias("main_payment_type")
)

# =======================================================
# Step 3: Add derived KPIs
# - avg_installment_value: average value per installment
# =======================================================
order_payments_clean = order_payments_clean.withColumn(
    "avg_installment_value",
    expr("CASE WHEN total_installments > 0 THEN total_payment / total_installments ELSE NULL END")
)

# =======================================================
# Final check
# =======================================================
order_payments_clean.show(10, truncate=False)



+--------------------------------+-------------+------------------+---------------------+-----------------+---------------------+
|order_id                        |total_payment|total_installments|payment_methods_count|main_payment_type|avg_installment_value|
+--------------------------------+-------------+------------------+---------------------+-----------------+---------------------+
|00018f77f2f0320c557190d7a144bdd3|259.83       |3                 |1                    |credit_card      |86.61                |
|00042b26cf59d7ce69dfabb4e55b4fd9|218.04       |3                 |1                    |credit_card      |72.67999999999999    |
|00054e8431b9d7675808bcb819fb4a32|31.75        |1                 |1                    |credit_card      |31.75                |
|0006ec9db01a64e59a68b2c340bf65a7|97.32        |4                 |1                    |credit_card      |24.33                |
|000aed2e25dbad2f9ddb70584c5a2ded|152.77       |1                 |1                    |c

                                                                                

In [23]:
order_payments_clean.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- total_payment: double (nullable = true)
 |-- total_installments: long (nullable = true)
 |-- payment_methods_count: long (nullable = false)
 |-- main_payment_type: string (nullable = true)
 |-- avg_installment_value: double (nullable = true)



## **Orders Reviews Cleaning**

In [24]:
df_order_reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



In [25]:
order_reviews_clean = df_order_reviews \
    .withColumn("review_creation_date", to_timestamp("review_creation_date")) \
    .withColumn("review_answer_timestamp", to_timestamp("review_answer_timestamp"))

order_reviews_clean = order_reviews_clean \
    .withColumn("has_message", when(order_reviews_clean.review_comment_message.isNull(), 0).otherwise(1)) \
    .withColumn("has_title", when(order_reviews_clean.review_comment_title.isNull(), 0).otherwise(1))

order_reviews_clean = order_reviews_clean \
    .withColumn("clean_comment_message", trim(lower(regexp_replace("review_comment_message", "[^a-zA-Z0-9çãáéíóúâêôõüàèìòùÇÃÁÉÍÓÚÂÊÔÕÜÀÈÌÒÙ ]", "")))) \
    .withColumn("clean_comment_title", trim(lower(regexp_replace("review_comment_title", "[^a-zA-Z0-9çãáéíóúâêôõüàèìòùÇÃÁÉÍÓÚÂÊÔÕÜÀÈÌÒÙ ]", ""))))

order_reviews_clean = order_reviews_clean.withColumn(
    "review_sentiment",
    when(col("review_score") >= 4, "positive")
    .when(col("review_score") == 3, "neutral")
    .otherwise("negative")
)

In [26]:
order_reviews_clean.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)
 |-- has_message: integer (nullable = false)
 |-- has_title: integer (nullable = false)
 |-- clean_comment_message: string (nullable = true)
 |-- clean_comment_title: string (nullable = true)
 |-- review_sentiment: string (nullable = false)



## **Orders Dim**

In [35]:
# =======================================================
# Step 1: Aggregate reviews before joining
# - avg_review_score: average score per order
# - reviews_count: number of reviews per order
# =======================================================
order_reviews_agg = order_reviews_clean.groupBy("order_id").agg(
    avg("review_score").alias("avg_review_score"),
    count("*").alias("reviews_count")
)

# =======================================================
# Step 2: Join orders with order_items
# =======================================================
orders_items_joined = orders_clean.join(
    order_items_clean,
    on="order_id",
    how="left"
)

# =======================================================
# Step 3: Join with payments
# =======================================================
orders_items_payments_joined = orders_items_joined.join(
    order_payments_clean,
    on="order_id",
    how="left"
)

# =======================================================
# Step 4: Join with aggregated reviews
# =======================================================
orders_full_cleaned = orders_items_payments_joined.join(
    order_reviews_agg,
    on="order_id",
    how="left"
)

# =======================================================
# Step 5: Add partitioning columns (optional, useful for DW)
# =======================================================
orders_full_cleaned = orders_full_cleaned \
    .withColumn("order_year", year("order_purchase_timestamp")) \
    .withColumn("order_month", month("order_purchase_timestamp"))

# =======================================================
# Final check
# =======================================================
orders_full_cleaned.show(10, truncate=False)

                                                                                

+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------+---------------+-------------+--------------------------------+--------------------------------+-------------------+-----+-------------+------------+----------------+------------------+-------------------+-------------+------------------+---------------------+-----------------+---------------------+----------------+-------------+----------+-----------+
|order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|shipping_days|delivery_delay|delivery_status|order_item_id|product_id                      |seller_id                       |shipping_limit_date|price|freight_value|item_revenue|shipping_r

In [28]:
orders_full_cleaned.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- shipping_days: integer (nullable = true)
 |-- delivery_delay: integer (nullable = true)
 |-- delivery_status: string (nullable = false)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- item_revenue: double (nullable = true)
 |-- shipping_revenue: double (nullable = true)
 |-- total_cost: double (nullable = true)
 |-- freight_percentage: d

## **Product Cleaning**

In [29]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [30]:
# =======================================================
# Step 1: Remove invalid categories
# - keep only rows where product_category_name is not null
# =======================================================
products_clean = df_products.filter(df_products.product_category_name.isNotNull())

# =======================================================
# Step 2: Drop rows with missing essential dimensions
# - weight, length, height, width are mandatory for analysis
# =======================================================
products_clean = products_clean.dropna(subset=[
    "product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"
])

# =======================================================
# Step 3: Cast numeric columns to proper types
# =======================================================
products_clean = products_clean \
    .withColumn("product_name_lenght", col("product_name_lenght").cast("int")) \
    .withColumn("product_description_lenght", col("product_description_lenght").cast("int")) \
    .withColumn("product_photos_qty", col("product_photos_qty").cast("int")) \
    .withColumn("product_weight_g", col("product_weight_g").cast("int")) \
    .withColumn("product_length_cm", col("product_length_cm").cast("int")) \
    .withColumn("product_height_cm", col("product_height_cm").cast("int")) \
    .withColumn("product_width_cm", col("product_width_cm").cast("int"))

# =======================================================
# Step 4: Rename typo columns for clarity
# =======================================================
products_clean = products_clean \
    .withColumnRenamed("product_name_lenght", "product_name_length") \
    .withColumnRenamed("product_description_lenght", "product_description_length")

# =======================================================
# Step 5: Calculate product volume (cm³)
# =======================================================
products_clean = products_clean.withColumn(
    "product_volume_cm3",
    expr("product_length_cm * product_height_cm * product_width_cm")
)

# =======================================================
# Step 6: Standardize category name
# - lower case + trim
# =======================================================
products_clean = products_clean.withColumn(
    "product_category_name", trim(lower("product_category_name"))
)

# =======================================================
# Step 7: Filter out unrealistic values
# - remove rows with zero or negative dimensions/weight
# =======================================================
products_clean = products_clean.filter(
    (col("product_weight_g") > 0) &
    (col("product_length_cm") > 0) &
    (col("product_height_cm") > 0) &
    (col("product_width_cm") > 0)
)

# =======================================================
# Step 8: Add derived feature - size category
# - classify products as Small, Medium, Large based on volume
# =======================================================
products_clean = products_clean.withColumn(
    "size_category",
    when(col("product_volume_cm3") < 1000, "Small")
    .when(col("product_volume_cm3") < 10000, "Medium")
    .otherwise("Large")
)

# =======================================================
# Final check
# =======================================================
products_clean.show(10, truncate=False)

                                                                                

+--------------------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+-------------+
|product_id                      |product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_volume_cm3|size_category|
+--------------------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+-------------+
|1e9e8ef04dbcff4541ed26657ea517e5|perfumaria           |40                 |287                       |1                 |225             |16               |10               |14              |2240              |Medium       |
|3aa071139cb16b67ca9e5dea641aaa2f|artes                |44                 |276                 

In [31]:
products_clean.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_length: integer (nullable = true)
 |-- product_description_length: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_volume_cm3: integer (nullable = true)
 |-- size_category: string (nullable = false)



## **Product Category Translation Cleaning**

In [32]:
df_product_category_translation.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [34]:
# =======================================================
# Step 1: Clean product_category_translation table
# - standardize category names (lowercase + trim)
# =======================================================
product_category_translation_clean = df_product_category_translation.withColumn(
    "product_category_name", trim(lower("product_category_name"))
)

# =======================================================
# Step 2: Enrich products with translated category names
# - join products_clean with category translations
# - keep translation if available, otherwise null
# =======================================================
products_enriched = products_clean.join(
    product_category_translation_clean,
    on="product_category_name",
    how="left"
)

# =======================================================
# Step 3: (Optional) Add a fallback category
# - replace null translation with the original Portuguese name
#   so every product has at least one usable category
# =======================================================
products_full_cleaned = products_enriched.withColumn(
    "category_final",
    when(col("product_category_name_english").isNotNull(), col("product_category_name_english"))
    .otherwise(col("product_category_name"))
)

# =======================================================
# Final check
# =======================================================
products_full_cleaned.show(10, truncate=False)

+---------------------+--------------------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+-------------+-----------------------------+-------------------+
|product_category_name|product_id                      |product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_volume_cm3|size_category|product_category_name_english|category_final     |
+---------------------+--------------------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+-------------+-----------------------------+-------------------+
|perfumaria           |1e9e8ef04dbcff4541ed26657ea517e5|40                 |287                       |1                 |225             |16               |10             

In [36]:
products_full_cleaned.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name_length: integer (nullable = true)
 |-- product_description_length: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_volume_cm3: integer (nullable = true)
 |-- size_category: string (nullable = false)
 |-- product_category_name_english: string (nullable = true)
 |-- category_final: string (nullable = true)



## **Saller Cleaning**

In [37]:
df_sellers.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [38]:
# =======================================================
# Step 1: Basic cleaning
# - normalize city names: lowercase, trim, remove special chars
# =======================================================
sellers_cleaned = df_sellers.withColumn(
    "seller_city",
    lower(trim(regexp_replace(col("seller_city"), "[^a-zA-ZÀ-ÿ ]", "")))
)

# =======================================================
# Step 2: Fix common typos in city names
# - replace variations like "d oeste" → "do oeste"
# =======================================================
sellers_cleaned = sellers_cleaned.withColumn(
    "seller_city",
    regexp_replace(col("seller_city"), "\\bd['\\s]o\\b", " do ")
)

for wrong, correct in corrections.items():
    sellers_cleaned = sellers_cleaned.withColumn(
        "seller_city",
        when(col("seller_city") == wrong, correct).otherwise(col("seller_city"))
    )

# =======================================================
# Step 3: Validate essential fields
# - seller_id must not be null
# - seller_zip_code_prefix should be 5 digits
# =======================================================
sellers_cleaned = sellers_cleaned.filter(col("seller_id").isNotNull())
sellers_cleaned = sellers_cleaned.filter(length("seller_zip_code_prefix") == 5)

# =======================================================
# Step 4: Remove duplicate seller records
# - keep only unique seller_id rows
# =======================================================
sellers_cleaned = sellers_cleaned.dropDuplicates(["seller_id"])

# =======================================================
# Final check
# =======================================================
sellers_cleaned.show(10, truncate=False)

[Stage 95:>                                                         (0 + 1) / 1]

+--------------------------------+----------------------+--------------+------------+
|seller_id                       |seller_zip_code_prefix|seller_city   |seller_state|
+--------------------------------+----------------------+--------------+------------+
|001cca7ae9ae17fb1caed9dfb1094831|29156                 |cariacica     |ES          |
|001e6ad469a905060d959994f1b41e4f|24754                 |sao goncalo   |RJ          |
|002100f778ceb8431b7a1020ff7ab48f|14405                 |franca        |SP          |
|003554e2dce176b5555353e4f3555ac8|74565                 |goiania       |GO          |
|004c9cd9d87a3c30c522c48c4fc07416|14940                 |ibitinga      |SP          |
|00d8b143d12632bad99c0ad66ad52825|30170                 |belo horizonte|MG          |
|00fc707aaaad2d31347cf883cd2dfe10|87025                 |maringa       |PR          |
|011b0eaba87386a2ae96a7d32bb531d1|17580                 |pompeia       |SP          |
|01266d4c46afa519678d16a8b683d325|80250               

                                                                                

## **Geolocation Cleaning**

In [39]:
df_geolocation.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [40]:
geolocation_clean = df_geolocation.dropna(subset=[
    "geolocation_zip_code_prefix", 
    "geolocation_lat", 
    "geolocation_lng", 
    "geolocation_city", 
    "geolocation_state"
]).dropDuplicates(["geolocation_zip_code_prefix", "geolocation_city", "geolocation_state"])

# **Build DW Tables**

In [None]:
# =======================================================
# Dimension: Customers
# =======================================================
dim_customers = customers_clean.select(
    "customer_id", "customer_unique_id", "customer_city", "customer_state", "customer_zip_code_prefix"
).dropDuplicates(["customer_id"])

# =======================================================
# Dimension: Products
# =======================================================
dim_products = products_full_cleaned.select(
    "product_id", "category_final",
    "product_category_name", "product_category_name_english",
    "product_weight_g", "product_volume_cm3", "size_category", "product_photos_qty"
).dropDuplicates(["product_id"])

# =======================================================
# Dimension: Sellers
# =======================================================
dim_sellers = sellers_cleaned.select(
    "seller_id", "seller_city", "seller_state", "seller_zip_code_prefix"
).dropDuplicates(["seller_id"])

# =======================================================
# Dimension: Geolocation
# =======================================================
dim_geolocation = geolocation_clean.dropDuplicates(["geolocation_zip_code_prefix"])

# =======================================================
# Fact Table: Orders
# =======================================================
fact_orders = orders_full_cleaned.select(
    # ---------------- Identifiers (Primary / Foreign Keys) ----------------
    "order_id",          # Order identifier
    "customer_id",       # FK -> dim_customers
    "product_id",        # FK -> dim_products
    "seller_id",         # FK -> dim_sellers

    # ---------------- Date / Time attributes ----------------
    "order_purchase_timestamp",     # When order was placed
    "order_approved_at",            # When payment was approved
    "order_estimated_delivery_date" # When payment was deliveried
    

    # ---------------- Descriptive attributes ----------------
    "order_status",      # Status of the order (delivered, canceled, etc.)
    "delivery_status",   # Derived: on-time, late, early
    "main_payment_type", # payment type

    # ---------------- Measures (for aggregations) ----------------
    "price",             # Item(s) price (SUM, AVG)
    "freight_value",     # Shipping cost (SUM, AVG)
    "total_cost",        # price + freight_value (SUM, AVG)

    "total_payment",     # Total amount paid (SUM)
    "total_installments",# Number of installments (SUM, AVG)

    "shipping_days",     # Actual shipping duration (AVG, MIN, MAX)
    "delivery_delay",    # Difference between estimated & actual delivery (AVG, MIN, MAX)
    "avg_review_score"   # Customer satisfaction score (AVG)
)

In [43]:
# =======================================================
# Dimension: Time
# =======================================================

# Get min and max date from orders
date_range = orders_clean.agg(
    to_date(expr("min(order_purchase_timestamp)")).alias("min_date"),
    to_date(expr("max(order_purchase_timestamp)")).alias("max_date")
).collect()[0]

min_date, max_date = date_range["min_date"], date_range["max_date"]

# Generate a sequence of all dates
date_df = spark.createDataFrame([(1,)], ["id"]).withColumn(
    "date_key", explode(sequence(lit(min_date), lit(max_date), expr("interval 1 day")))
).drop("id")

# Build dimension table
dim_time = date_df \
    .withColumn("day", dayofmonth("date_key")) \
    .withColumn("month", month("date_key")) \
    .withColumn("year", year("date_key")) \
    .withColumn("day_of_week", date_format("date_key", "E")) \
    .withColumn("week_of_year", weekofyear("date_key")) \
    .withColumn("day_name", date_format("date_key", "EEEE")) \
    .withColumn("month_name", date_format("date_key", "MMMM"))

# Primary key
dim_time = dim_time.withColumnRenamed("date_key", "time_id")

                                                                                

# **Load To Postgres**
#### Save Full Cleand Tables

In [None]:
# Postgres Config Connection
jdbc_url = "jdbc:postgresql://postgres:5432/olist_dwh"
connection_properties = {
     "user": "admin",
     "password": "password",
     "driver": "org.postgresql.Driver"
 }

In [None]:
# write Dim Tables to Postgres
dim_customers.write.jdbc(url=jdbc_url, table="dim_customers", mode="overwrite", properties=connection_properties)
dim_products.write.jdbc(url=jdbc_url, table="dim_products", mode="overwrite", properties=connection_properties)
dim_sellers.write.jdbc(url=jdbc_url, table="dim_sellers", mode="overwrite", properties=connection_properties)
dim_geolocation.write.jdbc(url=jdbc_url, table="dim_geolocation", mode="overwrite", properties=connection_properties)
dim_time.write.jdbc(url=jdbc_url, table="dim_time", mode="overwrite", properties=connection_properties)


                                                                                

In [None]:
# write Fact Table to Postgres

fact_orders.write.jdbc(url=jdbc_url, table="fact_orders", mode="overwrite", properties=connection_properties)

                                                                                

## **Clean memory**

In [None]:
# =======================================================
# Clean up DataFrames from memory
# =======================================================
import gc

to_delete = [
    # Raw / Cleaned
    "df_customers", "customers_clean",
    "df_order", "orders_clean", "orders_full_cleaned", "orders_items_joined", "orders_items_payments_joined",
    "df_order_items", "order_items_clean",
    "df_order_payments", "order_payments_clean",
    "df_order_reviews", "order_reviews_clean",
    "df_products", "products_clean", "products_enriched", "products_full_cleaned",
    "df_product_category_translation", "product_category_translation_clean",
    "df_sellers", "sellers_cleaned",
    "df_geolocation", "geolocation_clean", "date_df",

    # Dimensions
    "dim_customers", "dim_products", "dim_sellers", "dim_geolocation", "dim_time",

    # Fact
    "fact_orders"
]

# Unpersist Spark DataFrames
for name in to_delete:
    obj = globals().get(name)
    if obj is not None:
        try:
            obj.unpersist(blocking=True)  # free cached memory in Spark
        except AttributeError:
            pass

# Clear cache from Spark
spark.catalog.clearCache()

# Stop Spark
spark.stop()

# Delete variables from Python memory
for name in to_delete:
    if name in globals():
        del globals()[name]

# Force garbage collection
gc.collect()


1834