In [0]:
# Read all CSVs into Spark DataFrames
df_customers = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_customers_dataset.csv")
df_geolocation = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_geolocation_dataset.csv")
df_order_items = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_order_items_dataset.csv")
df_order_payments = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_order_payments_dataset.csv")
df_order_reviews = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_order_reviews_dataset.csv")
df_orders = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_orders_dataset.csv")
df_products = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_products_dataset.csv")
df_sellers = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_sellers_dataset.csv")
df_product_category_name_translation = spark.read.option("header", True).csv("dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/product_category_name_translation.csv")


In [0]:
df_customers.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                   08775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [0]:
#Identifying Missing Values
from pyspark.sql.functions import col, count, when

def missing_values(df, df_name):
    print(f"\nMissing values in {df_name}:")
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show(truncate=False)

In [0]:
missing_values(df_customers, "customers")
missing_values(df_geolocation, "geolocation")
missing_values(df_order_items, "order_items")
missing_values(df_order_payments, "order_payments")
missing_values(df_order_reviews, "order_reviews")
missing_values(df_orders, "orders")
missing_values(df_products, "products")
missing_values(df_sellers, "sellers")
missing_values(df_product_category_name_translation, "product_category_name_translation")



Missing values in customers:
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|0          |0                 |0                       |0            |0             |
+-----------+------------------+------------------------+-------------+--------------+


Missing values in geolocation:
+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|0                          |0              |0              |0               |0                |
+---------------------------+---------------+---------------+----------------+-----------------+


Missing values 

## Handle Missing Values

1. **Drop Missing Values**
2. **Fill Missing Values**
3. **Impute Missing Values**


In [0]:
from pyspark.sql.functions import lit

# Example: Fill missing timestamp columns with date
df_orders_filled = df_orders.fillna({
    "order_id": "unknown_order",
    "order_approved_at": "2025-01-02 00:00:00",
    "order_delivered_carrier_date": "2025-01-03 00:00:00",
    "order_delivered_customer_date": "2025-01-04 00:00:00",
    "order_estimated_delivery_date": "2025-01-05 00:00:00"
})

# Show result
df_orders_filled.show()


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [0]:
df_customers_filled = df_customers.fillna({
    "customer_city": "Unknown",
    "customer_state": "Unknown"
})


In [0]:
df_geolocation_filled = df_geolocation.fillna({
    "geolocation_city": "Unknown",
    "geolocation_state": "Unknown"
})


In [0]:
df_order_items_filled = df_order_items.fillna({
    "shipping_limit_date": "1900-01-01 00:00:00",
    "price": 0,
    "freight_value": 0
})


In [0]:
df_order_payments_filled = df_order_payments.fillna({
    "payment_type": "Unknown",
    "payment_installments": 0,
    "payment_value": 0
})


In [0]:
df_order_reviews_filled = df_order_reviews.fillna({
    "review_comment_title": "Unknown",
    "review_comment_message": "No Comment",
    "review_score": 0,
    "review_creation_date": "1900-01-01 00:00:00",
    "review_answer_timestamp": "1900-01-01 00:00:00"
})


In [0]:
df_products_filled = df_products.fillna({
    "product_category_name": "Unknown",
    "product_name_lenght": 0,
    "product_description_lenght": 0,
    "product_photos_qty": 0,
    "product_weight_g": 0,
    "product_length_cm": 0,
    "product_height_cm": 0,
    "product_width_cm": 0
})


In [0]:
df_sellers_filled = df_sellers.fillna({
    "seller_city": "Unknown",
    "seller_state": "Unknown"
})


In [0]:
df_product_category_name_translation_filled = df_product_category_name_translation.fillna({
    "product_category_name_english": "Unknown"
})


In [0]:
from pyspark.sql.functions import col, when, count

def check_missing_values(df, name):
    print(f"Null values in {name}:")
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show(truncate=False)


In [0]:
check_missing_values(df_customers_filled, "df_customers_filled")
check_missing_values(df_geolocation_filled, "df_geolocation_filled")
check_missing_values(df_order_items_filled, "df_order_items_filled")
check_missing_values(df_order_payments_filled, "df_order_payments_filled")
check_missing_values(df_order_reviews_filled, "df_order_reviews_filled")
check_missing_values(df_orders_filled, "df_orders_filled")
check_missing_values(df_products_filled, "df_products_filled")
check_missing_values(df_sellers_filled, "df_sellers_filled")
check_missing_values(df_product_category_name_translation_filled, "df_product_category_name_translation_filled")


Null values in df_customers_filled:
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|0          |0                 |0                       |0            |0             |
+-----------+------------------+------------------------+-------------+--------------+

Null values in df_geolocation_filled:
+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|0                          |0              |0              |0               |0                |
+---------------------------+---------------+---------------+----------------+-----------------+

Null

In [0]:
#Inpute Missing Values
from pyspark.sql.functions import col, sum as _sum

# Count missing (null or empty) payment_value entries
missing_payment_count = df_order_payments.filter(
    (col("payment_value").isNull()) | (col("payment_value") == "")
).count()

print(f"Missing payment_value count: {missing_payment_count}")

# If you want percentage of missing values:
total_rows = df_order_payments.count()
missing_percentage = (missing_payment_count / total_rows) * 100
print(f"Missing payment_value percentage: {missing_percentage:.2f}%")




Missing payment_value count: 0
Missing payment_value percentage: 0.00%


In [0]:
df_order_payments.filter(
    (col("payment_value").isNull()) | (col("payment_value") == "")
).show()


+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
+--------+------------------+------------+--------------------+-------------+



In [0]:
df_customers.printSchema()
df_geolocation.printSchema()
df_order_items.printSchema()
df_order_payments.printSchema()
df_order_reviews.printSchema()
df_orders.printSchema()
df_products.printSchema()
df_sellers.printSchema()
df_product_category_name_translation.printSchema()


root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

root
 |-- geolocation_zip_code_prefix: string (nullable = true)
 |-- geolocation_lat: string (nullable = true)
 |-- geolocation_lng: string (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- freight_value: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: string (nullable = true)


#Standarizing the format


In [0]:
from pyspark.sql.functions import col, regexp_replace, to_timestamp

def standardize_customers(df):
    # 1️⃣ Clean column names
    for col_name in df.columns:
        new_name = col_name.strip().lower().replace(" ", "_")
        df = df.withColumnRenamed(col_name, new_name)
    
    # 2️⃣ Convert date columns to standard timestamp format (if present)
    date_columns = [c for c in df.columns if "date" in c or "dt" in c]
    for dc in date_columns:
        df = df.withColumn(dc, to_timestamp(col(dc), "yyyy-MM-dd HH:mm:ss"))
    
    # 3️⃣ Ensure numeric columns are in numeric type (example: customer_zip_code_prefix)
    if "customer_zip_code_prefix" in df.columns:
        df = df.withColumn(
            "customer_zip_code_prefix",
            regexp_replace(col("customer_zip_code_prefix"), ",", "").cast("int")
        )
    
    return df

df_customers = standardize_customers(df_customers)

df_customers.printSchema()
df_customers.show(5)


root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83

##Joins

In [0]:
# Register DataFrames as temporary views
df_customers.createOrReplaceTempView("customers")
df_orders.createOrReplaceTempView("orders")

# This will join customers → orders using customer_id as the common key.
join_query = """
SELECT 
    c.customer_id,
    c.customer_unique_id,
    c.customer_zip_code_prefix,
    c.customer_city,
    c.customer_state,
    o.order_id,
    o.order_status,
    o.order_purchase_timestamp
FROM customers c
JOIN orders o
    ON c.customer_id = o.customer_id
"""

df_customer_orders = spark.sql(join_query)

df_customer_orders.show(10, truncate=False)


+--------------------------------+--------------------------------+------------------------+-----------------------+--------------+--------------------------------+------------+------------------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city          |customer_state|order_id                        |order_status|order_purchase_timestamp|
+--------------------------------+--------------------------------+------------------------+-----------------------+--------------+--------------------------------+------------+------------------------+
|9ef432eb6251297304e76186b10a928d|7c396fd4830fd04220f754e42b4e5bff|3149                    |sao paulo              |SP            |e481f51cbdc54678b7cc49136f2d6af7|delivered   |2017-10-02 10:56:33     |
|b0830fb4747a6c6d20dea0b8c802d7ef|af07308b275d755c9edb36a90c618231|47813                   |barreiras              |BA            |53cdb2fc8bc7dce0b6741e2150273451|delivered   |2018-07-24 

In [0]:
orders_payment = spark.read.option("header", True).csv(
    "dbfs:/Volumes/commerce_spark_workspace/default/ecommerce_raw/olist_order_payments_dataset.csv"
)

from pyspark.sql.functions import sum as _sum

order_with_total_value = orders_payment.groupBy("order_id").agg(
    _sum("payment_value").alias("total_order_value")
)

order_with_total_value.show(10, truncate=False)


+--------------------------------+-----------------+
|order_id                        |total_order_value|
+--------------------------------+-----------------+
|41b3c9e5e85309a0ddf6fdf03c906b26|118.35           |
|66102db498a42db4dd18edc624df4a95|143.9            |
|e178bee213ba3b996d728678aea863a4|58.85            |
|019886de8f385a39b75bedbb726fd4ef|188.4            |
|5c3ed80bbf87273549529ebc6a45650d|111.72           |
|d40dd8018a5302969efb31bd21744cab|49.12            |
|c5a04e8b14a30499f45857da5edfea7e|58.94            |
|4ec40a7013b2b2e20429009214c283b4|19.07            |
|4ec755335e77b43c823c978d570fe48f|178.27           |
|2107484004fafd3fa95b3ddefb066aed|799.94           |
+--------------------------------+-----------------+
only showing top 10 rows


#Advance Transformation

In [0]:
df_order_items.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.90|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.90|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.00|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.90|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [0]:
quantiles = df_order_items.withColumn("price", col("price").cast("double")).approxQuantile("price", [0.01, 0.99], 0.0)
low_cutoff, high_cutoff = quantiles[0], quantiles[1]
print(f"Low cutoff: {low_cutoff}, high cutoff: {high_cutoff}")

Low cutoff: 9.99, high cutoff: 890.0


In [0]:
df_filtered = df_order_items.filter((col("price") >= low_cutoff) & (col("price") <= high_cutoff))


In [0]:
df_products_cleansed = df_products.withColumn(
    'product_size_category',
    when(col('product_weight_g') < 500, 'small').
    when(col('product_weight_g').between(500, 1000), 'medium')
    .otherwise('Large')
)   

In [0]:
df_products_cleansed.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

In [0]:
from pyspark.sql.functions import col, sum as _sum

# Total revenue per seller (only product price)
df_total_revenue = df_order_items.groupBy("seller_id") \
    .agg(_sum(col("price").cast("double")).alias("total_revenue"))

df_total_revenue.show()


+--------------------+------------------+
|           seller_id|     total_revenue|
+--------------------+------------------+
|0691148aee60ca479...| 5868.510000000001|
|fa14641f57b655e93...|            359.93|
|87d3c3aeb3ead3355...|            749.35|
|3364a91ec4d56c98e...|3579.9399999999996|
|3d4824f20035949c7...|           1041.04|
|1e47defeeadeca0e9...|            234.12|
|fb3cfbc8a86f5d7fb...|1007.2999999999998|
|4a1f694197d05fe70...| 907.7700000000001|
|b0b346d3a89f5eb4c...|             614.0|
|2ff6b7bff164ef055...|           1803.59|
|e84ad2127668df3aa...|             987.0|
|cac63f48c38cd7d00...|             874.0|
|daeb5653dd96c1b11...|            2305.9|
|596849622429351f4...|1104.6999999999998|
|7daca0837f033a41a...|1228.9799999999998|
|00fc707aaaad2d313...|           12684.9|
|134a288e7de827ec5...|            877.45|
|0d85bbda9889ce1f7...|            4845.2|
|dc4a0fc896dc34b0d...|24180.770000000004|
|2a7c6c0b0d5efde2b...|             147.6|
+--------------------+------------