In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Imputer
from pyspark.sql.window import *

In [2]:
spark = SparkSession.builder\
.appName("Module 2- Data_cleanin_and_Transformation")\
.enableHiveSupport()\
.master('yarn')\
.getOrCreate()
spark = SparkSession.builder \
    .appName('Module 3- Data_cleanin_and_Transformation') \
    .config('spark.executor.memory', '6g') \
    .config('spark.executor.cores', '4') \
    .config('spark.executor.instances', '2') \
    .config('spark.driver.memory', '4g') \
    .config('spark.driver.maxResultSize', '2g') \
    .config('spark.sql.shuffle.partitions', '64') \
    .config('spark.default.parallelism', '64') \
    .config('spark.sql.adaptive.enabled', 'true') \
    .config('spark.sql.adaptive.coalescePartitions.enabled', 'true') \
    .config('spark.sql.autoBroadcastJoinThreshold', 50*1024*1024) \
    .config('spark.sql.files.maxPartitionBytes', '64MB') \
    .config('spark.sql.files.openCostInBytes', '2MB') \
    .config('spark.memory.fraction', 0.8) \
    .config('spark.memory.storageFraction', 0.2) \
    .getOrCreate()

25/08/27 20:23:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/08/27 20:23:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
hdfs_path= '/tmp/Brazilian'

In [4]:
customers_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_customers_dataset.csv')

geolocation_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_geolocation_dataset.csv')

order_items_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_order_items_dataset.csv')

order_payments_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_order_payments_dataset.csv')

order_reviews_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_order_reviews_dataset.csv')

orders_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_orders_dataset.csv')

products_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_products_dataset.csv')

sellers_df  = spark.read\
.format('csv')\
.option('header',True)\
.option('inferSchema',True)\
.load(hdfs_path+'/olist_sellers_dataset.csv')


                                                                                

# cache frequently used data for better performance and faster operations 

In [5]:
##caching data 
customers_df.cache()
orders_df.cache()
order_items_df.cache()


DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

![detailed diagrame of database](https://i.imgur.com/HRhd2Y0.png)

In [6]:
order_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

In [7]:
order_items_product = order_items_joined_df.join(products_df,'product_id','inner')

In [8]:
order_items_product_sellers = order_items_product.join(sellers_df,'seller_id','inner')

In [9]:
full_orders_df =  order_items_product_sellers.join(customers_df,'customer_id','inner')

In [10]:
## joining geoloaction as left join as it will enrich our data but not provide some importance so we are doing a left join so that we dont loose on some imformation on this 

In [11]:
full_orders_df =  full_orders_df.join(geolocation_df,full_orders_df.customer_zip_code_prefix==geolocation_df.geolocation_zip_code_prefix,'left')

In [12]:
full_orders_df =  full_orders_df.join(order_reviews_df,'order_id','left')

In [13]:
full_orders_df =  full_orders_df.join(order_payments_df,'order_id','left')

In [14]:
full_orders_df.cache()

25/08/27 20:23:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [15]:
# total revenue per seller 

In [16]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [17]:
seller_revenue_df = full_orders_df.select('seller_id','price').groupBy('seller_id').agg(sum('price').alias('total_revenue')).orderBy(col('total_revenue').desc())
seller_revenue_df.show(5)



+--------------------+--------------------+
|           seller_id|       total_revenue|
+--------------------+--------------------+
|4869f7a5dfa277a7d...| 3.613871732000037E7|
|53243585a1d6dc264...|3.4291592950000025E7|
|4a3ca9315b744ce9f...|3.3759570840001926E7|
|7c67e1448b00f6e96...| 3.228232178999848E7|
|fa1c13f2614d7b5c4...|3.0139386310000394E7|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [18]:
#Total orders per customers 
total_orders_per_customers=full_orders_df.select('customer_id').groupBy('customer_id').count()
total_orders_per_customers.show(5)



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|54c4a093ba4ad0808...|   79|
|383b7ca1ab9eae02f...|   44|
|a6d2e55e57d8cc9ce...|   39|
|dc69189910231e3ab...|   67|
|ba0a799b8a7b0b1a1...|  134|
+--------------------+-----+
only showing top 5 rows



                                                                                

In [19]:
#average review score per seller

In [20]:
average_seller_score=full_orders_df.select('seller_id','review_score').groupBy('seller_id').agg(avg('review_score').alias('average rating'))
average_seller_score.show(5)



+--------------------+------------------+
|           seller_id|    average rating|
+--------------------+------------------+
|da8622b14eb17ae28...|3.9793441825464995|
|d94a40fd42351c259...| 4.104613192424153|
|e70053bf73d1b5863...| 4.071315372424722|
|0ea22c1cfbdc755f8...| 4.231254932912392|
|1fbe10c70e30765ed...|1.2540045766590389|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [21]:
# top sold products
top_sold_products=full_orders_df.select('product_id').groupBy('product_id').count().orderBy(col('count').desc())
top_sold_products.show(5)



+--------------------+-----+
|          product_id|count|
+--------------------+-----+
|aca2eb7d00ea1a7b8...|86740|
|422879e10f4668299...|81110|
|99a4788cb24856965...|78775|
|389d119b48cf3043d...|60248|
|d1c427060a0f73f6b...|59274|
+--------------------+-----+
only showing top 5 rows



                                                                                

# Optimized join for Data Integration

# import make sure to restart ur kernal as we have done joining above too 

In [22]:
order_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

In [23]:
order_items_product = order_items_joined_df.join(products_df,'product_id','inner')

In [24]:
order_items_product_sellers = order_items_product.join(broadcast(sellers_df),'seller_id','inner')

In [25]:
full_orders_df =  order_items_product_sellers.join(customers_df,'customer_id','inner')

In [26]:
full_orders_df =  full_orders_df.join(broadcast(geolocation_df),full_orders_df.customer_zip_code_prefix==geolocation_df.geolocation_zip_code_prefix,'left')

In [27]:
full_orders_df =  full_orders_df.join(broadcast(order_reviews_df),'order_id','left')

In [28]:
full_orders_df =  full_orders_df.join(order_payments_df,'order_id','left')

In [29]:
full_orders_df.cache()

25/08/27 20:25:48 WARN CacheManager: Asked to cache already cached data.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [30]:
# top 10 customers by spending 
full_orders_df.select('customer_id','price').groupBy('customer_id').agg(sum('price').alias('total_spending')).orderBy(col('total_spending').desc()).show(10)



+--------------------+------------------+
|         customer_id|    total_spending|
+--------------------+------------------+
|d3e82ccec3cb5f956...|         6662844.0|
|df55c14d1476a9a34...|         3565657.0|
|fe5113a38e3575c04...|         3293604.0|
|ec5b2ba62e5743423...|         2556120.0|
|63b964e79dee32a35...|         2501664.0|
|46bb3c0b1a65c8399...|         2336752.0|
|05455dfa7cd02f13d...| 2160194.400000087|
|3690e975641f01bd0...|         2124498.0|
|349509b216bd5ec11...|         1923627.0|
|695476b5848d64ba0...|1820543.1299999943|
+--------------------+------------------+
only showing top 10 rows



                                                                                

# Window Function And Ranking 

In [31]:
#rank top selling product per seller 

In [32]:
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))


In [33]:
top_seller_products_df =  full_orders_df.select('seller_id','product_id','price').withColumn('rank',rank().over(window_spec)).distinct().show(5)

[Stage 71:>                                                         (0 + 1) / 1]

+--------------------+--------------------+-----+----+
|           seller_id|          product_id|price|rank|
+--------------------+--------------------+-----+----+
|001cca7ae9ae17fb1...|e251ebd2858be1aa7...|199.0|   1|
|001cca7ae9ae17fb1...|98a8c2fa16d7239c6...|169.0| 325|
|001cca7ae9ae17fb1...|e251ebd2858be1aa7...|139.9| 420|
|001cca7ae9ae17fb1...|6d15a14a5c04e3ef3...|139.9| 420|
|001cca7ae9ae17fb1...|e251ebd2858be1aa7...|139.0|1007|
+--------------------+--------------------+-----+----+
only showing top 5 rows



                                                                                

In [34]:
# dense rank on seller based on revenue
seller_dense_rank= full_orders_df.select('seller_id','price').withColumn('dense_rank',dense_rank().over(window_spec)).show(5)

[Stage 78:>                                                         (0 + 1) / 1]

+--------------------+-----+----------+
|           seller_id|price|dense_rank|
+--------------------+-----+----------+
|001cca7ae9ae17fb1...|199.0|         1|
|001cca7ae9ae17fb1...|199.0|         1|
|001cca7ae9ae17fb1...|199.0|         1|
|001cca7ae9ae17fb1...|199.0|         1|
|001cca7ae9ae17fb1...|199.0|         1|
+--------------------+-----+----------+
only showing top 5 rows



                                                                                

# Advance Aggregation and Enrichment 

In [35]:
# total revenue & Average order value (ADV) per Customer

In [36]:
customer_spending_df  = full_orders_df.groupBy('customer_id')\
.agg(
    count('order_id').alias('total_order'),
    sum('price').alias('total_sale'),
    round(avg('price'),2).alias('AOV')

).orderBy(desc('total_sale'))
customer_spending_df.show(5)



+--------------------+-----------+----------+------+
|         customer_id|total_order|total_sale|   AOV|
+--------------------+-----------+----------+------+
|d3e82ccec3cb5f956...|       6876| 6662844.0| 969.0|
|df55c14d1476a9a34...|        743| 3565657.0|4799.0|
|fe5113a38e3575c04...|       2292| 3293604.0|1437.0|
|ec5b2ba62e5743423...|       1428| 2556120.0|1790.0|
|63b964e79dee32a35...|       6072| 2501664.0| 412.0|
+--------------------+-----------+----------+------+
only showing top 5 rows



                                                                                

In [37]:
# seller performance matrix (revenue , average review , order Count,standard daviation/price_variabiliy)

In [38]:
seller_performance_matrix = full_orders_df.groupBy('seller_id')\
.agg(
    sum('price').alias('total_revenue'),
    round(avg('review_score'),2).alias('average_review'),
    count('seller_id').alias('order_count'),
    round(stddev('price'),2).alias('price_variability')
).orderBy(desc('total_revenue')).show()



+--------------------+--------------------+--------------+-----------+-----------------+
|           seller_id|       total_revenue|average_review|order_count|price_variability|
+--------------------+--------------------+--------------+-----------+-----------------+
|4869f7a5dfa277a7d...|3.6138717320000365E7|          4.09|     184587|           111.65|
|53243585a1d6dc264...|3.4291592950000025E7|          4.12|      54514|           499.65|
|4a3ca9315b744ce9f...| 3.375957084000193E7|          3.77|     330661|            59.37|
|7c67e1448b00f6e96...|3.2282321789998483E7|          3.42|     233306|            50.39|
|fa1c13f2614d7b5c4...| 3.013938631000039E7|          4.38|      87686|            307.7|
|da8622b14eb17ae28...|2.9857669730001643E7|          3.98|     264433|            72.92|
|7e93a43ef30c4f03f...|2.6315706299999908E7|          4.15|      50226|           377.24|
|1025f0e2d44d7041d...|2.2937518519999977E7|          3.89|     229587|             84.3|
|46dc3b2cc0980fb8e...

                                                                                

In [39]:
# product popularity matrics

In [40]:
product_popularity = full_orders_df.groupBy('product_id')\
.agg(
    count('order_id').alias('total_sales'),
    sum('price').alias('total_revenue'),
    round(avg('price'),22).alias('avg_price'),
        round(stddev('price'),2).alias('price_variability'),
    collect_set('seller_id').alias('unique_product')

).orderBy(desc('total_sales')).show()



+--------------------+-----------+------------------+------------------+-----------------+--------------------+
|          product_id|total_sales|     total_revenue|         avg_price|price_variability|      unique_product|
+--------------------+-----------+------------------+------------------+-----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740| 6164630.299999948| 71.07021328106926|             3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.509999978| 54.77489224509898|             4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.709999913| 87.86750504601603|             4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|3280533.1299999706| 54.45049014075107|             4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.329999974|138.67974710665678|            16.58|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358| 3181698.899999965|54.520355392576256|             4.59|[1f50f920176fa

                                                                                

In [41]:
# monthly revenue and total order count trend ----->
# order purchase timestamp 

# total_orders 
# total_revenue
# avg_order_value 
# min_order_value
# max_order_value 

In [42]:
full_orders_df.groupBy(month('order_purchase_timestamp'))\
.agg(
count('order_id').alias('total_order'),
    round(sum('price'),2).alias('total_revenue'),
    round(avg('price'),2).alias('avg_order_value'),
    min('price').alias('min_order_value'),
    max('price').alias('max_order_value')
    
).orderBy(month('order_purchase_timestamp')).show()



+-------------------------------+-----------+--------------+---------------+---------------+---------------+
|month(order_purchase_timestamp)|total_order| total_revenue|avg_order_value|min_order_value|max_order_value|
+-------------------------------+-----------+--------------+---------------+---------------+---------------+
|                              1|    1495580| 1.715329015E8|         114.69|            2.9|         3690.0|
|                              2|    1551163|1.7878178407E8|         115.26|           2.99|         6735.0|
|                              3|    1809467|2.1868116843E8|         120.85|            4.9|        4099.99|
|                              4|    1693860|2.1715696913E8|          128.2|           0.85|         4799.0|
|                              5|    1918571|2.4006115197E8|         125.12|            3.5|         6499.0|
|                              6|    1701909|2.1024332349E8|         123.53|           3.49|         4590.0|
|                  

                                                                                

In [43]:
#customer retention analysis (first and last order)
full_orders_df.groupBy('customer_id')\
.agg(
    min("order_purchase_timestamp").alias("first_order"),
        max("order_purchase_timestamp").alias("last_order"),
    datediff(max('order_purchase_timestamp'),min('order_purchase_timestamp')).alias('retention_period'),
    count('order_id').alias('total_order'),
    round(avg('price'),2).alias('avg_price')
).orderBy(desc('retention_period')).show()




+--------------------+-------------------+-------------------+----------------+-----------+---------+
|         customer_id|        first_order|         last_order|retention_period|total_order|avg_price|
+--------------------+-------------------+-------------------+----------------+-----------+---------+
|5f896bc340a1f2bb5...|2017-09-18 12:57:19|2017-09-18 12:57:19|               0|        111|    87.85|
|95d92c3b47e281a45...|2017-04-12 21:21:54|2017-04-12 21:21:54|               0|        115|     49.9|
|a74fe4b2c5b909ee2...|2017-11-20 10:59:50|2017-11-20 10:59:50|               0|         97|    59.99|
|66603e8aa65d2bbfa...|2018-08-07 21:45:03|2018-08-07 21:45:03|               0|        181|     59.0|
|301bb7f7306732397...|2017-05-24 22:42:55|2017-05-24 22:42:55|               0|         63|     89.0|
|248707f9a94a18492...|2018-07-25 04:34:42|2018-07-25 04:34:42|               0|         19|     89.0|
|dc7633763aa34b15e...|2017-11-03 15:16:50|2017-11-03 15:16:50|               0|   

                                                                                

# Advance Enrichment

In [44]:
# order status flags 

In [45]:
full_orders_df.select('order_status').show(5)

+------------+
|order_status|
+------------+
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
+------------+
only showing top 5 rows



In [46]:
full_orders_df = full_orders_df.withColumn('is_delivered',when(col('order_status')=='delivered',lit(1)).otherwise(lit(0)))\
.withColumn('is_canceled',when(col('order_status')=='canceled',lit(1)).otherwise(lit(0)))

In [47]:
full_orders_df.where(full_orders_df.order_status == "canceled") \
    .select("order_status", "is_delivered", "is_canceled") \
    .show(5)

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
+------------+------------+-----------+
only showing top 5 rows



In [48]:
#order Revenue Calculation
full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+col('freight_value'))
full_orders_df.select('price','freight_value','order_revenue').show()

+-----+-------------+------------------+
|price|freight_value|     order_revenue|
+-----+-------------+------------------+
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
|158.9|        18.99|177.89000000000001|
+-----+-------------+------------------+
only showing top

In [49]:
customer_spending_df.show(5)



+--------------------+-----------+----------+------+
|         customer_id|total_order|total_sale|   AOV|
+--------------------+-----------+----------+------+
|d3e82ccec3cb5f956...|       6876| 6662844.0| 969.0|
|df55c14d1476a9a34...|        743| 3565657.0|4799.0|
|fe5113a38e3575c04...|       2292| 3293604.0|1437.0|
|ec5b2ba62e5743423...|       1428| 2556120.0|1790.0|
|63b964e79dee32a35...|       6072| 2501664.0| 412.0|
+--------------------+-----------+----------+------+
only showing top 5 rows



                                                                                

In [50]:
# customer segmentation based on spending

customer_spending_df = customer_spending_df.withColumn('customer_segment',when(col('AOV')>=1200,"High-Value").when(col('AOV')>=500,"Medium-Value").otherwise("Low-Value"))

In [51]:
customer_spending_df.show(5)



+--------------------+-----------+----------+------+----------------+
|         customer_id|total_order|total_sale|   AOV|customer_segment|
+--------------------+-----------+----------+------+----------------+
|d3e82ccec3cb5f956...|       6876| 6662844.0| 969.0|    Medium-Value|
|df55c14d1476a9a34...|        743| 3565657.0|4799.0|      High-Value|
|fe5113a38e3575c04...|       2292| 3293604.0|1437.0|      High-Value|
|ec5b2ba62e5743423...|       1428| 2556120.0|1790.0|      High-Value|
|63b964e79dee32a35...|       6072| 2501664.0| 412.0|       Low-Value|
+--------------------+-----------+----------+------+----------------+
only showing top 5 rows



                                                                                

In [52]:
full_orders_df= full_orders_df.join(customer_spending_df.select('customer_id','customer_segment'),'customer_id','left')

In [53]:
full_orders_df.select('customer_id','customer_segment').show(5)

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|0264bbda6bd6492c7...|       Low-Value|
|0264bbda6bd6492c7...|       Low-Value|
|0264bbda6bd6492c7...|       Low-Value|
|0264bbda6bd6492c7...|       Low-Value|
|0264bbda6bd6492c7...|       Low-Value|
+--------------------+----------------+
only showing top 5 rows



In [54]:
full_orders_df.select('order_purchase_timestamp').show(5)

+------------------------+
|order_purchase_timestamp|
+------------------------+
|     2018-05-22 18:47:34|
|     2018-05-22 18:47:34|
|     2018-05-22 18:47:34|
|     2018-05-22 18:47:34|
|     2018-05-22 18:47:34|
+------------------------+
only showing top 5 rows



In [55]:
#hourly order distribution

In [56]:
full_orders_df.withColumn('hour_timestamp',hour(col('order_purchase_timestamp'))).select('order_purchase_timestamp','hour_timestamp','order_id').groupBy('hour_timestamp').count().orderBy(desc('count')).show(5)



+--------------+-------+
|hour_timestamp|  count|
+--------------+-------+
|            16|1261538|
|            14|1218318|
|            11|1202759|
|            15|1159603|
|            17|1152472|
+--------------+-------+
only showing top 5 rows



                                                                                

In [57]:
# weekday vs weekend order
full_orders_df.withColumn('order_type_day',when(dayofweek('order_purchase_timestamp').isin(1,7) , "Weekend"  ).otherwise("Weekday")).select('order_purchase_timestamp','order_type_day').show()


+------------------------+--------------+
|order_purchase_timestamp|order_type_day|
+------------------------+--------------+
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
|     2018-05-22 18:47:34|       Weekday|
+------------------------+--------

In [58]:
full_orders_df.withColumn('fright_catagory',when(col('freight_value')>=300,"High-Value").when(col('freight_value')>=200,"Medium-Value").otherwise("Low-Value")).select('freight_value','fright_catagory').show()

+-------------+---------------+
|freight_value|fright_catagory|
+-------------+---------------+
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
|        18.99|      Low-Value|
+-------------+---------------+
only showing top 20 rows



In [59]:
full_orders_df.agg(min('freight_value'),max('freight_value')).show()



+------------------+------------------+
|min(freight_value)|max(freight_value)|
+------------------+------------------+
|               0.0|            409.68|
+------------------+------------------+



                                                                                

In [60]:
#order volume by State 

In [61]:
full_orders_df.groupBy('geolocation_state').count().show()



+-----------------+-------+
|geolocation_state|  count|
+-----------------+-------+
|               TO|  22360|
|               RS| 971705|
|               PR| 746514|
|               MG|3433229|
|               MT| 155225|
|               SP|6742207|
|               AM|   6488|
|               MS|  73679|
|               PA|  96276|
|               PE| 132001|
|               MA|  61706|
|               PI|  27693|
|               RJ|3626836|
|               AL|  37741|
|               RN|  24818|
|               RO|  24526|
|             NULL|    317|
|               SC| 644944|
|               ES| 367211|
|               PB|  33379|
+-----------------+-------+
only showing top 20 rows



                                                                                

In [62]:
!hadoop fs -mkdir /tmp/olist_processed

mkdir: `/tmp/olist_processed': File exists


In [63]:
full_orders_df.write.mode('overwrite').parquet('/tmp/olist_processed')

                                                                                

In [65]:
!hadoop fs -ls /tmp/olist_processed/

Found 12 items
-rw-r--r--   2 root hadoop          0 2025-08-27 20:28 /tmp/olist_processed/_SUCCESS
-rw-r--r--   2 root hadoop   10007579 2025-08-27 20:27 /tmp/olist_processed/part-00000-59d041a6-ebd1-411c-bc21-b203436d348f-c000.snappy.parquet
-rw-r--r--   2 root hadoop    9562523 2025-08-27 20:27 /tmp/olist_processed/part-00001-59d041a6-ebd1-411c-bc21-b203436d348f-c000.snappy.parquet
-rw-r--r--   2 root hadoop   10040308 2025-08-27 20:27 /tmp/olist_processed/part-00002-59d041a6-ebd1-411c-bc21-b203436d348f-c000.snappy.parquet
-rw-r--r--   2 root hadoop    9806874 2025-08-27 20:27 /tmp/olist_processed/part-00003-59d041a6-ebd1-411c-bc21-b203436d348f-c000.snappy.parquet
-rw-r--r--   2 root hadoop   11980796 2025-08-27 20:27 /tmp/olist_processed/part-00004-59d041a6-ebd1-411c-bc21-b203436d348f-c000.snappy.parquet
-rw-r--r--   2 root hadoop   11679870 2025-08-27 20:27 /tmp/olist_processed/part-00005-59d041a6-ebd1-411c-bc21-b203436d348f-c000.snappy.parquet
-rw-r--r--   2 root hadoop   1104145