In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.appName('OlistData')\
.getOrCreate()

25/12/15 04:04:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [3]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv', header = True, inferSchema = True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv', header = True, inferSchema = True)
order_items_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv', header = True, inferSchema = True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv', header = True, inferSchema = True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv', header = True, inferSchema = True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv', header = True, inferSchema = True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv', header = True, inferSchema = True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv', header = True, inferSchema = True)
category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv', header = True, inferSchema = True)

                                                                                

In [4]:
# Cache Frequently Used Data For Better Performance

orders_df.cache()
customers_df.cache()
order_items_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [5]:
from pyspark.sql.functions import *

In [6]:
order_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

In [7]:
order_items_products_df = order_items_joined_df.join(products_df,'product_id','inner')

In [8]:
order_items_products_sellers_df = order_items_products_df.join(broadcast(sellers_df),'seller_id','inner')

In [9]:
full_orders_df = order_items_products_sellers_df.join(customers_df,'customer_id','inner')

In [10]:
full_orders_df = full_orders_df.join(broadcast(geolocation_df),full_orders_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix, 'left')

In [11]:
full_orders_df = full_orders_df.join(broadcast(reviews_df),'order_id','left')

In [12]:
full_orders_df = full_orders_df.join(payments_df,'order_id','left')

In [13]:
full_orders_df.cache()

25/12/15 04:05:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

## Advance Aggregations and Enrichment

In [14]:
# Total Revenue & Average Order Value (AOV) per Customer

customer_spending_df = full_orders_df.groupBy('customer_id')\
.agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_spent'),
    round(avg('price'),2).alias('AOV')
    )\
.orderBy(desc('total_spent'))

In [15]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [16]:
6662844.0/6876 # --> AOV


969.0

In [17]:
# Seller Performance Metrics (Revenue, Average Review, Order Count)

seller_performance_df = full_orders_df.groupBy('seller_id')\
.agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_revenue'),
    round(avg('review_score'),2).alias('avg_review_score'),
    round(stddev('price'),2).alias('price_variability')
)\
.orderBy(desc('total_revenue'))

In [18]:
seller_performance_df.show()



+--------------------+------------+--------------------+----------------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review_score|price_variability|
+--------------------+------------+--------------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.613871731999314E7|            4.09|           111.65|
|53243585a1d6dc264...|       54514|3.4291592950000696E7|            4.12|           499.65|
|4a3ca9315b744ce9f...|      330661| 3.375957084001202E7|            3.77|            59.37|
|7c67e1448b00f6e96...|      233306|3.2282321790021457E7|            3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686|3.0139386310006626E7|            4.38|            307.7|
|da8622b14eb17ae28...|      264433| 2.985766973003611E7|            3.98|            72.92|
|7e93a43ef30c4f03f...|       50226| 2.631570630000355E7|            4.15|           377.24|
|1025f0e2d44d7041d...|      229587|2.2937518520012792E7|            3.89|       

                                                                                

In [19]:
# Product Popularity Metrics

product_metrics_df = full_orders_df.groupBy('product_id')\
.agg(
    count('order_id').alias('total_sales'),
    sum('price').alias('total_revenue'),
    round(avg('price'),2).alias('avg_price'),
    round(stddev('price'),2).alias('price_volatility'),\
    collect_set('seller_id').alias('unique_sellers')
)\
.orderBy(desc('product_id'))

In [20]:
product_metrics_df.show()



+--------------------+-----------+------------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+---------+----------------+--------------------+
|fffe9eeff12fcbd74...|         51| 12749.48999999999|   249.99|             0.0|[7299e27ed73d2ad9...|
|fffdb2d0ec8d6a61f...|        495|17180.049999999912|    34.71|            1.16|[9f505651f4a6abe9...|
|fff9553ac224cec9d...|        117|            3744.0|     32.0|             0.0|[643214e62b870443...|
|fff81cc3158d2725c...|         33|            2970.0|     90.0|             0.0|[778323240ce2830d...|
|fff6177642830a9a9...|         69| 7959.309999999989|   115.35|            5.02|[7a67c85e85bb2ce8...|
|fff515ea94dbf35d5...|         70|           57260.0|    818.0|             0.0|[93dc87703c046b60...|
|fff28f91211774864...|         10|             180.0|     18.0|             0.0|[5

                                                                                

In [21]:
# Customer Retention Analysis (First and Last Order)

customer_retention_df = full_orders_df.groupBy('customer_id')\
.agg(
    first('order_purchase_timestamp').alias('first_order_date'),
    last('order_purchase_timestamp').alias('last_order_date'),
    count('order_id').alias('total_orders'),
    round(avg('price'),2).alias('AOV')
)\
.orderBy(desc('total_orders'))

In [22]:
customer_retention_df.show()



+--------------------+-------------------+-------------------+------------+------+
|         customer_id|   first_order_date|    last_order_date|total_orders|   AOV|
+--------------------+-------------------+-------------------+------------+------+
|351e40989da90e704...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427| 85.99|
|50920f8cd0681fd86...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752| 43.82|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556|  26.4|
|270c23a11d024a44c...|2017-08-08 20:26:31|2017-08-08 20:26:31|        8001| 36.59|
|5c87184371002d49e...|2018-01-05 19:15:37|2018-01-05 19:15:37|        6876| 12.49|
|d3e82ccec3cb5f956...|2017-03-18 14:28:34|2017-03-18 14:28:34|        6876| 969.0|
|d5f2b3f597c7ccafb...|2017-12-13 14:21:15|2017-12-13 14:21:15|        6706|  59.0|
|c2f18647725395af4...|2018-03-06 19:21:47|2018-03-06 19:21:47|        6612|  34.9|
|24e7dc2ff8c071263...|2017-11-24 16:16:45|2017-11-24 16:16:45|        6597|  59.2|
|7bb

                                                                                

## Extended Enrichment

In [23]:
# Order Status Flags

full_orders_df = full_orders_df.withColumn('is_delivered',when(col('order_status') == 'delivered', lit(1)).otherwise(lit(0)))\
.withColumn('is_canceled', when(col('order_status') == 'canceled', lit(1)).otherwise(lit(0)))

In [24]:
full_orders_df.where(full_orders_df['order_status'] == 'delivered').select('order_status','is_delivered','is_canceled').show(100) 
# Shows all Delivered Orders

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|


In [25]:
full_orders_df.where(full_orders_df['order_status'] == 'canceled').select('order_status','is_delivered','is_canceled').show(100) 
# Shows all Canceled Orders

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|


In [26]:
# Order Revenue Calculation

full_orders_df = full_orders_df.withColumn('order_revenue', col('price') + col('freight_value'))

In [27]:
full_orders_df.select('price', 'freight_value', 'order_revenue').show()

+-----+-------------+-------------+
|price|freight_value|order_revenue|
+-----+-------------+-------------+
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
+-----+-------------+-------------+
only showing top 20 rows



In [28]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [29]:
customer_spending_df = customer_spending_df.withColumn(
    'customer_segment',
    when(col('AOV') >= 1200, 'High_Value')
    .when( (col('AOV')<1200) & (col('AOV')>=500), 'Medium_Value')
    .otherwise('Low_Value'))

In [30]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+----------------+
|         customer_id|total_orders|       total_spent|    AOV|customer_segment|
+--------------------+------------+------------------+-------+----------------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|    Medium_Value|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|      High_Value|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|      High_Value|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|      High_Value|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|       Low_Value|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|      High_Value|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|    Medium_Value|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|      High_Value|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|      High_Value|
|695476b5848d64ba0...|         687|18205

                                                                                

In [31]:
full_orders_df = full_orders_df.join(customer_spending_df.select('customer_id','customer_segment'), 'customer_id', how = 'left')

In [32]:
full_orders_df.select('customer_id','customer_segment').show()

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
+--------------------+----------------+
only showing top 20 rows



In [33]:
full_orders_df.select('order_purchase_timestamp').show()

+------------------------+
|order_purchase_timestamp|
+------------------------+
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
|     2017-09-13 08:59:02|
+------------------------+
only showing top 20 rows



In [34]:
# Hourly Order Distribution

full_orders_df = full_orders_df.withColumn('hour_of_day', expr('hour(order_purchase_timestamp)'))

In [35]:
full_orders_df.select('hour_of_day', 'order_purchase_timestamp').show()

+-----------+------------------------+
|hour_of_day|order_purchase_timestamp|
+-----------+------------------------+
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
|          8|     2017-09-13 08:59:02|
+-----------+------------------------+
only showing top 20 rows



In [36]:
# WeekDay vs WeekEnd Orders

full_orders_df = full_orders_df.withColumn('order_day_type',\
                                           when(dayofweek('order_purchase_timestamp').isin(1,7),lit('Weekend')).otherwise(lit('Weekday')))
    

In [37]:
full_orders_df.select('order_purchase_timestamp', 'order_day_type').show()

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
|     2017-09-13 08:59:02|       Weekday|
+------------------------+--------

In [39]:
!hadoop fs -mkdir /data/olist/processed/

In [42]:
full_orders_df.write.mode('overwrite').parquet('/olist/processed')


                                                                                

In [43]:
!hadoop fs -ls -h /olist/processed/

Found 11 items
-rw-r--r--   2 root hadoop          0 2025-12-15 04:17 /olist/processed/_SUCCESS
-rw-r--r--   2 root hadoop     31.6 M 2025-12-15 04:17 /olist/processed/part-00000-483cab24-6963-457c-9eaf-31eb799b9f65-c000.snappy.parquet
-rw-r--r--   2 root hadoop     31.2 M 2025-12-15 04:17 /olist/processed/part-00001-483cab24-6963-457c-9eaf-31eb799b9f65-c000.snappy.parquet
-rw-r--r--   2 root hadoop     31.1 M 2025-12-15 04:17 /olist/processed/part-00002-483cab24-6963-457c-9eaf-31eb799b9f65-c000.snappy.parquet
-rw-r--r--   2 root hadoop     30.9 M 2025-12-15 04:17 /olist/processed/part-00003-483cab24-6963-457c-9eaf-31eb799b9f65-c000.snappy.parquet
-rw-r--r--   2 root hadoop     31.5 M 2025-12-15 04:17 /olist/processed/part-00004-483cab24-6963-457c-9eaf-31eb799b9f65-c000.snappy.parquet
-rw-r--r--   2 root hadoop     17.9 M 2025-12-15 04:17 /olist/processed/part-00005-483cab24-6963-457c-9eaf-31eb799b9f65-c000.snappy.parquet
-rw-r--r--   2 root hadoop     18.5 M 2025-12-15 04:17 /olist/pr

25/12/15 04:39:56 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /hadoop/spark/tmp/blockmgr-f67af8a0-b3de-4eb3-8ef9-59fef7e63b83. Falling back to Java IO way
java.io.IOException: Failed to delete: /hadoop/spark/tmp/blockmgr-f67af8a0-b3de-4eb3-8ef9-59fef7e63b83
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174) ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109) ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90) ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121) ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120) ~[spark-common-utils_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.sc