# Module 3 - Data Integration and Aggregation.ipynb

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
.appName('OlistData') \
.getOrCreate()

25/05/22 13:57:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# DEFINING PATH

hdfs_path = "/data/olist/"

# Data loading 

customers_df = spark.read.csv(hdfs_path + "olist_customers_dataset.csv", header = True, inferSchema = True)
geolocation_df = spark.read.csv(hdfs_path + "olist_geolocation_dataset.csv", header = True, inferSchema = True)
order_items_df = spark.read.csv(hdfs_path + "olist_order_items_dataset.csv", header = True, inferSchema = True)
payments_df = spark.read.csv(hdfs_path + "olist_order_payments_dataset.csv", header = True, inferSchema = True)
reviews_df = spark.read.csv(hdfs_path + "olist_order_reviews_dataset.csv", header = True, inferSchema = True)
orders_df = spark.read.csv(hdfs_path + "olist_orders_dataset.csv", header = True, inferSchema = True)
products_df = spark.read.csv(hdfs_path + "olist_products_dataset.csv", header = True, inferSchema = True)
sellers_df = spark.read.csv(hdfs_path + "olist_sellers_dataset.csv", header = True, inferSchema = True)
category_translation_df = spark.read.csv(hdfs_path + "product_category_name_translation.csv", header = True, inferSchema = True)

                                                                                

In [3]:
customers_df.cache()
order_items_df.cache()
orders_df.cache()

DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]

In [4]:
orders_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

orders_items_products_joined_df = orders_items_joined_df.join(products_df,'product_id','inner')

orders_items_products_sellers_joined_df = orders_items_products_joined_df.join(sellers_df,'seller_id','inner')

full_orders_df = orders_items_products_sellers_joined_df.join(customers_df,'customer_id','inner')

In [5]:
# Geoloction Data

full_orders_df = full_orders_df \
    .join(geolocation_df, customers_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix , 'left')

In [6]:
full_orders_df = full_orders_df.join(reviews_df, 'order_id', 'left')

In [7]:
full_orders_df = full_orders_df.join(payments_df, 'order_id', 'left')

In [8]:
# full_orders_df conatains all the significant data from all the datasets which can be used for the orders analysis
    
full_orders_df.cache()

25/05/22 13:57:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

# Total Revenues per Seller

In [9]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [10]:
from pyspark.sql.functions import *

In [11]:
seller_revenue_df = full_orders_df.groupBy("seller_id").agg(sum(col("price") + col("freight_value")).alias("total_revenue"))

seller_revenue_df.show()



+--------------------+--------------------+
|           seller_id|       total_revenue|
+--------------------+--------------------+
|b76dba6c951ab00dc...|   559016.8900000007|
|7a67c85e85bb2ce85...|       2.317876285E7|
|d2374cbcbb3ca4ab1...|   4949060.059999981|
|c8b0e2b0a7095e5d8...|  1768185.0000000005|
|994f04b3718c2bab3...|   814749.4000000005|
|88460e8ebdecbfecb...|   7219199.240000013|
|cca3071e3e9bb7d12...|1.2451246460000006E7|
|d13e50eaa47b4cbe9...|  1359037.8300000003|
|92eb0f42c21942b65...|  1574163.7700000051|
|f3b80352b986ab4d1...|  1735357.1700000013|
|02f623a8eb246f3c5...|  1075615.9400000013|
|7040e82f899a04d1b...|   2016086.109999996|
|e9779976487b77c6d...|   7744218.899999994|
|a1043bafd471dff53...|2.2805273999999993E7|
|ccc4bbb5f32a6ab2b...|1.0329871049999975E7|
|392e0502231ae2f8b...|   294928.9499999991|
|98dac6635aee4995d...|  1533625.1099999952|
|7f35f9daf223da737...|   63239.97000000006|
|d263fa444c1504a75...|  28420.479999999985|
|93dc87703c046b603...|  1466147.

                                                                                

# Total Orders per customer 

In [12]:
total_order_df = full_orders_df.groupBy('customer_id').agg(countDistinct('order_id').alias('total_orders'))

total_order_df.show()



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|0721e1c4b91bc6ded...|           1|
|af5555169a360e0fd...|           1|
|31b63fdb116c62400...|           1|
|c79c8d356d92bb1b7...|           1|
|a78e85d7d80bdae62...|           1|
|9e67b6474f7e8370b...|           1|
|2b034f3bb93bc729a...|           1|
|d7eea84fcaf846ca5...|           1|
|9801ac60f0291d180...|           1|
|f0f97266247be0432...|           1|
|2ef8398f62042a2b4...|           1|
|3f0f45bd49f790854...|           1|
|2a8da2a879305eff0...|           1|
|30f523f7def36192e...|           1|
|81ac2682a2b8aef41...|           1|
|c3fca3a8fb26d6303...|           1|
|45ce7fad590507dc8...|           1|
|0e63522224dc7d760...|           1|
|b3240741f338744f0...|           1|
|f410948f43b18cb65...|           1|
+--------------------+------------+
only showing top 20 rows



                                                                                

# Average Review Score 

In [13]:
# First converting review_score to 'INT'

avg_review_score = full_orders_df.withColumn("review_score_int", col("review_score").cast("int")) \
    .groupBy("seller_id") \
    .agg(avg("review_score_int").alias("avg_review_score"))

avg_review_score.show()



+--------------------+------------------+
|           seller_id|  avg_review_score|
+--------------------+------------------+
|b76dba6c951ab00dc...| 4.219273172723561|
|7a67c85e85bb2ce85...| 4.258920734844587|
|d2374cbcbb3ca4ab1...|  3.71899382437114|
|c8b0e2b0a7095e5d8...|3.7818383167220375|
|994f04b3718c2bab3...| 4.019792907541464|
|88460e8ebdecbfecb...|3.2727391849647143|
|cca3071e3e9bb7d12...|3.8663984994689384|
|d13e50eaa47b4cbe9...| 4.856344424067767|
|92eb0f42c21942b65...|  3.67118597644481|
|f3b80352b986ab4d1...|4.1966389194283025|
|02f623a8eb246f3c5...|4.6650602409638555|
|7040e82f899a04d1b...| 3.532726953673374|
|e9779976487b77c6d...|4.2127329046350255|
|a1043bafd471dff53...| 4.245974167893515|
|ccc4bbb5f32a6ab2b...| 4.396169885420463|
|392e0502231ae2f8b...| 4.410434782608696|
|98dac6635aee4995d...|3.8582470064370256|
|7f35f9daf223da737...| 4.352486187845304|
|d263fa444c1504a75...|               5.0|
|93dc87703c046b603...| 3.927276161692482|
+--------------------+------------

                                                                                

# Most Sold Products (Top 10)

In [14]:
# To find the most sold products by number of units

most_sold_product = full_orders_df.groupBy('product_id') \
    .agg(count('*').alias('total_items_sold')) \
    .orderBy(desc('total_items_sold')).limit(10)

most_sold_product.show()



+--------------------+----------------+
|          product_id|total_items_sold|
+--------------------+----------------+
|aca2eb7d00ea1a7b8...|           86740|
|422879e10f4668299...|           81110|
|99a4788cb24856965...|           78775|
|389d119b48cf3043d...|           60248|
|d1c427060a0f73f6b...|           59274|
|368c6c730842d7801...|           58358|
|53759a2ecddad2bb8...|           52654|
|53b36df67ebb7c415...|           52105|
|154e7e31ebfa09220...|           42700|
|3dd2a17168ec895c7...|           40787|
+--------------------+----------------+



                                                                                

# Top Customers by Spending

In [15]:
# There are 2 'customer_id' & 'customer_unique_id'
# I'm calculating on 'customer_id'

top_customers_by_spending = full_orders_df.groupBy("customer_id") \
    .agg(sum("payment_value").alias("total_spent")) \
    .orderBy(desc("total_spent")).limit(10)

top_customers_by_spending.show()



+--------------------+--------------------+
|         customer_id|         total_spent|
+--------------------+--------------------+
|1ff773612ab8934db...| 1.756825199999893E7|
|05455dfa7cd02f13d...|1.3282083359999327E7|
|ec5b2ba62e5743423...|1.0388528640000112E7|
|0c792d32a3251b4f6...|   8254681.600000529|
|78fc46047c4a639e8...|   7488519.999999339|
|1617b1357756262bf...|   7433259.520000033|
|1dbc055ccab23ed89...|   7216273.400000708|
|d5f2b3f597c7ccafb...|   6800018.119998923|
|dd3f1762eb601f41c...|  6746388.4800006235|
|10de381f8a8d23fff...|   5184499.500000076|
+--------------------+--------------------+



                                                                                

In [16]:
# spark.stop()

# Optimized Joins for Data Integration

In [17]:
orders_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

orders_items_products_joined_df = orders_items_joined_df.join(products_df,'product_id','inner')

In [18]:
from pyspark.sql.functions import *

# Broadcast Join

orders_items_products_sellers_joined_df = orders_items_products_joined_df.join(broadcast(sellers_df),'seller_id','inner')

In [19]:
full_orders_df = orders_items_products_sellers_joined_df.join(customers_df,'customer_id','inner')

In [20]:
# Geoloction Data

full_orders_df = full_orders_df \
    .join(broadcast(geolocation_df), customers_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix , 'left')

In [21]:
full_orders_df = full_orders_df.join(broadcast(reviews_df), 'order_id', 'left')

In [22]:
full_orders_df = full_orders_df.join(payments_df, 'order_id', 'left')

In [23]:
full_orders_df.cache()

25/05/22 14:00:39 WARN CacheManager: Asked to cache already cached data.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

# Aggregation

In [24]:
# Total Orders per customers

customer_order_count_df = full_orders_df.groupBy('customer_id') \
    .agg(count('order_id').alias('total_orders')) \
    .orderBy(desc('total_orders'))

customer_order_count_df.show()



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|351e40989da90e704...|       11427|
|50920f8cd0681fd86...|       10752|
|9b43e2a62de9bab3a...|        8556|
|270c23a11d024a44c...|        8001|
|5c87184371002d49e...|        6876|
|d3e82ccec3cb5f956...|        6876|
|d5f2b3f597c7ccafb...|        6706|
|c2f18647725395af4...|        6612|
|24e7dc2ff8c071263...|        6597|
|7bb57d182bdc11653...|        6258|
|63b964e79dee32a35...|        6072|
|d22f25a9fadfb1abb...|        6072|
|1ff773612ab8934db...|        5820|
|13aa59158da63ba0e...|        5206|
|78fc46047c4a639e8...|        5200|
|dd3f1762eb601f41c...|        4992|
|a193aa8d905b8e246...|        4896|
|9eb3d566e87289dcb...|        4872|
|2ba91e12e5e4c9f56...|        4752|
|55e7cfd6e28d2fbfb...|        4728|
+--------------------+------------+
only showing top 20 rows



                                                                                

In [25]:
#Average Review Score Per Seller

seller_review_df = full_orders_df.groupBy('seller_id')\
.agg(avg('review_score').alias('avg_review_score'))\
.orderBy(desc('avg_review_score'))

seller_review_df.show()



+--------------------+----------------+
|           seller_id|avg_review_score|
+--------------------+----------------+
|a20d8058c866dbaec...|             5.0|
|297d5eccd19fa9a83...|             5.0|
|3296662b1331dea51...|             5.0|
|570d4583587a5fe2d...|             5.0|
|918717417d88a9f9a...|             5.0|
|aa8af66c623d7d544...|             5.0|
|98dddbc4601dd4443...|             5.0|
|8b181ee5518df84f1...|             5.0|
|ea846a0e7ad98a741...|             5.0|
|26b482dccfa29bd2e...|             5.0|
|09bad886111255c5b...|             5.0|
|0c7f30ae9b147eca0...|             5.0|
|a08692680c77d30a0...|             5.0|
|33ab10be054370c25...|             5.0|
|0ade5cc4a305ed709...|             5.0|
|63272377184e96994...|             5.0|
|398cb257329ef7af7...|             5.0|
|5e106d93b717c2682...|             5.0|
|9c1c0c36cd23c2089...|             5.0|
|0ad80de75c8113263...|             5.0|
+--------------------+----------------+
only showing top 20 rows



                                                                                

In [26]:
# Top 10 Most Sold Products

top_products_df = full_orders_df.groupBy('product_id')\
.agg(count('order_id').alias('total_sold'))\
.orderBy(desc('total_sold'))\
.limit(10)

top_products_df.show()



+--------------------+----------+
|          product_id|total_sold|
+--------------------+----------+
|aca2eb7d00ea1a7b8...|     86740|
|422879e10f4668299...|     81110|
|99a4788cb24856965...|     78775|
|389d119b48cf3043d...|     60248|
|d1c427060a0f73f6b...|     59274|
|368c6c730842d7801...|     58358|
|53759a2ecddad2bb8...|     52654|
|53b36df67ebb7c415...|     52105|
|154e7e31ebfa09220...|     42700|
|3dd2a17168ec895c7...|     40787|
+--------------------+----------+



                                                                                

# Window Function and Ranking

In [27]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [28]:
# Defining Window Spec
# dividing the dataset into groups by seller_id. Inside each seller group, ordering the rows by price in descending order.

window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [29]:
# Rank Top Selling Products Per seller

top_seller_products_df = full_orders_df.withColumn('rank', rank().over(window_spec)) \
    .filter(col('rank')<=5)

top_seller_products_df.select('seller_id','price','rank').show()

[Stage 99:>                                                         (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

In [30]:
# Dense Rank for Sellers Based on Revenue

seller_revenue = full_orders_df.groupBy('seller_id').agg(sum('price').alias('total_revenue'))

window_spec_2 = Window.partitionBy('seller_id').orderBy(desc('total_revenue'))

revenue_based_sellers = seller_revenue.withColumn('dense_rank', dense_rank().over(window_spec_2))

revenue_based_sellers.select('seller_id', 'total_revenue', 'dense_rank').show()



+--------------------+------------------+----------+
|           seller_id|     total_revenue|dense_rank|
+--------------------+------------------+----------+
|0015a82c2db000af6...|          755380.0|         1|
|001cca7ae9ae17fb1...|3888092.4200000023|         1|
|002100f778ceb8431...|181886.90000000002|         1|
|00ab3eff1b5192e5f...|            6076.0|         1|
|01c97ebb5cdac5289...|  66335.5000000005|         1|
|01cf7e3d21494c41f...| 1533269.369999997|         1|
|01fdefa7697d26ad9...| 2598357.990000003|         1|
|02b72cdeb9cfcc429...|115935.79999999997|         1|
|02ecc2a19303f05e5...| 973622.7600000001|         1|
|02f623a8eb246f3c5...|          957411.0|         1|
|0307f7565ff85b299...| 487547.9999999999|         1|
|0336182e1b3e92f02...|  953.599999999999|         1|
|038b75b729c8a9a04...|           17979.5|         1|
|039e6ad9dae796144...|         1297420.0|         1|
|0417b067eeab773d2...| 333797.6400000009|         1|
|042573af89b6d931f...| 104089.5200000003|     

                                                                                

# Advance Aggregation and Enrichment

In [31]:
full_orders_df.show(5)

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+-------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_approved

In [32]:
# Total Revenue & Average Order Value (AOV) per Customer

customer_spending_df = full_orders_df.groupBy('customer_id') \
    .agg(
        count('order_id').alias('total_orders'),
        sum('price').alias('total_spent'),
        round(avg('price'),2).alias('AOV')
    )\
    .orderBy(desc('total_spent'))

In [33]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [34]:
# Seller Performance Metrics ( Revenue, Average Review, Order Count)

seller_performance_df = full_orders_df.groupBy('seller_id') \
    .agg(
        count('order_id').alias('total_orders'),
        sum('price').alias('total_revenue'),
        round(avg('review_score'),2).alias('avg_review'),
        round(stddev('price'),2).alias('price_variability')
    ).orderBy(desc('total_revenue'))

In [35]:
seller_performance_df.show()



+--------------------+------------+--------------------+----------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review|price_variability|
+--------------------+------------+--------------------+----------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.613871731999997E7|      4.09|           111.65|
|53243585a1d6dc264...|       54514|3.4291592949999996E7|      4.12|           499.65|
|4a3ca9315b744ce9f...|      330661| 3.375957084000014E7|      3.77|            59.37|
|7c67e1448b00f6e96...|      233306|3.2282321789999764E7|      3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686|3.0139386309999976E7|      4.38|            307.7|
|da8622b14eb17ae28...|      264433| 2.985766973000004E7|      3.98|            72.92|
|7e93a43ef30c4f03f...|       50226| 2.631570629999995E7|      4.15|           377.24|
|1025f0e2d44d7041d...|      229587|2.2937518519999966E7|      3.89|             84.3|
|46dc3b2cc0980fb8e...|       90426|2.1791773289999947E

                                                                                

In [36]:
# Product Popularity Metrics

product_metrics_df = full_orders_df.groupBy('product_id') \
    .agg(
        count('order_id').alias('total_sales'),
        sum('price').alias('total_revenue'),
        round(avg('price'),2).alias('avg_price'),
        round(stddev('price'),2).alias('price_volatility'),
        collect_set('seller_id').alias('unique_sellers')
    ).orderBy(desc('total_sales'))

In [37]:
product_metrics_df.show()



+--------------------+-----------+------------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740| 6164630.300000016|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.510000012|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.710000023|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|3280533.1300000106|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.329999989|   138.68|           16.58|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358| 3181698.899999993|    54.52|            4.59|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654| 2893017.499999997|    54.94|            4.52|[1

                                                                                

In [38]:
# Monthly Revenue and Order Count Trend ----> HW

"""
order_purchase_timestamp ---> month

total_orders 
total_revenue
avg_order_value
min_order_value
max_order_value
"""

monthly_trend_df = full_orders_df.withColumn('order_month',date_format('order_purchase_timestamp','yyyy-MM')) \
    .groupBy('order_month') \
    .agg(
        countDistinct('order_id').alias('total_orders'),
        sum('payment_value').alias('total_revenue'),
        round(avg('payment_value'),2).alias('avg_order_value'),
        min('payment_value').alias('min_order_value'),
        max('payment_value').alias('max_order_value')
    ).orderBy(desc('order_month'))


In [39]:
monthly_trend_df.show()



+-----------+------------+--------------------+---------------+---------------+---------------+
|order_month|total_orders|       total_revenue|avg_order_value|min_order_value|max_order_value|
+-----------+------------+--------------------+---------------+---------------+---------------+
|    2018-09|           1|             5493.18|         166.46|         166.46|         166.46|
|    2018-08|        6452|      1.8581802659E8|         166.96|           0.31|        4513.32|
|    2018-07|        6273|2.1120745612000066E8|         194.34|           0.01|        7274.88|
|    2018-06|        6160|2.0063959189000025E8|          178.6|           0.05|        4681.78|
|    2018-05|        6853| 2.304250969800002E8|         186.27|           0.03|         4445.5|
|    2018-04|        6934| 2.376497203399989E8|         187.75|           0.01|        3526.46|
|    2018-03|        7188| 2.213195058600001E8|         168.15|           0.14|        4175.26|
|    2018-02|        6694|1.996234613899

                                                                                

In [40]:
# Customer Retention Analysis ( First & Last Order )

# Step 1: Base aggregation
customer_retention_df = full_orders_df.groupBy('customer_id') \
    .agg(
        min('order_purchase_timestamp').alias('first_order_date'),
        max('order_purchase_timestamp').alias('last_order_date'),
        count('order_id').alias('total_orders'),
        round(avg('payment_value'),2).alias('aov')
    ).orderBy(desc("total_orders"))

# .withColumn("retention_days", datediff("last_order_date", "first_order_date")) 

In [41]:
customer_retention_df.show()



+--------------------+-------------------+-------------------+------------+-------+
|         customer_id|   first_order_date|    last_order_date|total_orders|    aov|
+--------------------+-------------------+-------------------+------------+-------+
|351e40989da90e704...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427|   7.86|
|50920f8cd0681fd86...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752| 140.56|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556|  13.83|
|270c23a11d024a44c...|2017-08-08 20:26:31|2017-08-08 20:26:31|        8001|   7.68|
|5c87184371002d49e...|2018-01-05 19:15:37|2018-01-05 19:15:37|        6876| 165.54|
|d3e82ccec3cb5f956...|2017-03-18 14:28:34|2017-03-18 14:28:34|        6876| 180.88|
|d5f2b3f597c7ccafb...|2017-12-13 14:21:15|2017-12-13 14:21:15|        6706|1014.02|
|c2f18647725395af4...|2018-03-06 19:21:47|2018-03-06 19:21:47|        6612| 163.31|
|24e7dc2ff8c071263...|2017-11-24 16:16:45|2017-11-24 16:16:45|        6597| 

                                                                                

# Extended Enrichment 

In [42]:
# Order Status Flags

full_orders_df = full_orders_df.withColumn('is_delivered', when(col('order_status')=='delivered', lit(1)).otherwise(lit(0))) \
    .withColumn('is_canceled', when(col('order_status')=='canceled', lit(1)).otherwise(lit(0)))

In [43]:
full_orders_df.select('order_status','is_delivered','is_canceled').where(full_orders_df['order_status']=='canceled').show()

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
+------------+------------+-----------+
only showing top 20 rows



In [44]:
# Order Revenue Calcualtion

full_orders_df = full_orders_df.withColumn('order_revenue', col('price')+col('freight_value'))

full_orders_df.select('price', 'freight_value', 'order_revenue').show()

+-----+-------------+------------------+
|price|freight_value|     order_revenue|
+-----+-------------+------------------+
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
|28.99|         7.46|36.449999999999996|
+-----+-------------+------------------+
only showing top

In [45]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [46]:
customer_spending_df.show() 
customer_spending_df.printSchema()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [47]:
# Customer Segmentation based on spending

customer_spending_df = customer_spending_df.withColumn('customer_segment', 
                                                       when(col('AOV')>=1200,'High Value')
                                                       .when((col('AOV')<1200) & (col('AOV')>=500), 'Medium Value').otherwise('Low Value')) 

In [48]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+----------------+
|         customer_id|total_orders|       total_spent|    AOV|customer_segment|
+--------------------+------------+------------------+-------+----------------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|    Medium Value|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|      High Value|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|      High Value|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|      High Value|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|       Low Value|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|      High Value|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|    Medium Value|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|      High Value|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|      High Value|
|695476b5848d64ba0...|         687|18205

                                                                                

In [49]:
full_order_df = full_orders_df.join(customer_spending_df.select('customer_id','customer_segment'), 'customer_id',how='left')

In [50]:
full_order_df.select('customer_id', 'customer_segment').show()

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
|dbef5eb24f60585a8...|       Low Value|
+--------------------+----------------+
only showing top 20 rows



In [51]:
# Hourly Order Distribution
full_order_df = full_order_df.withColumn('hour_of_day', expr('hour(order_purchase_timestamp)'))

full_order_df.select('customer_id','order_purchase_timestamp','hour_of_day').show()

+--------------------+------------------------+-----------+
|         customer_id|order_purchase_timestamp|hour_of_day|
+--------------------+------------------------+-----------+
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23:19:16|         23|
|88b0bac2d79ffc975...|     2018-08-07 23

In [52]:
# Weekday VS Weekend

full_order_df = full_order_df.withColumn('order_day_type', 
                                         when(expr('dayofweek(order_purchase_timestamp) IN (1.7)'), lit('Weekend'))
                                         .otherwise(lit('Weekday')))

In [53]:
# Weekday vs Weekend Order

# full_orders_df = full_orders_df.withColumn('order_day_type',\
#                                            when(dayofweek('order_purchase_timestamp').isin(1,7),lit('Weekend')).otherwise(lit('weekday')))

In [54]:
full_order_df.select('customer_id','order_id','order_purchase_timestamp','order_day_type').show()
#.where(full_order_df['order_day_type']=='Weekend').show()

+--------------------+--------------------+------------------------+--------------+
|         customer_id|            order_id|order_purchase_timestamp|order_day_type|
+--------------------+--------------------+------------------------+--------------+
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       Weekday|
|88b0bac2d79ffc975...|5128bbf180434cd93...|     2018-08-07 23:19:16|       W

In [55]:
# a new column frieght category based on freight_value --> low, med or high

full_orders_df = full_orders_df.withColumn(
    "freight_category",
    when(full_orders_df.freight_value < 20, "Low")
    .when((full_orders_df.freight_value >= 20) & (full_orders_df.freight_value < 100), "Medium")
    .otherwise("High")
)

In [56]:
full_orders_df.select('freight_value','freight_category').show()

+-------------+----------------+
|freight_value|freight_category|
+-------------+----------------+
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
|         7.46|             Low|
+-------------+----------------+
only showing top 20 rows



In [57]:
# Order Volume by Customer State

order_volume_by_state = full_orders_df.select("order_id", "customer_state") \
    .dropDuplicates(["order_id"]) \
    .groupBy("customer_state") \
    .agg(countDistinct("order_id").alias("total_orders")) \
    .orderBy("total_orders", ascending=False)

order_volume_by_state.show()



+--------------+------------+
|customer_state|total_orders|
+--------------+------------+
|            SP|       41375|
|            RJ|       12762|
|            MG|       11544|
|            RS|        5432|
|            PR|        4998|
|            SC|        3612|
|            BA|        3358|
|            DF|        2125|
|            ES|        2025|
|            GO|        2007|
|            PE|        1648|
|            CE|        1327|
|            PA|         970|
|            MT|         903|
|            MA|         740|
|            MS|         709|
|            PB|         532|
|            PI|         493|
|            RN|         482|
|            AL|         411|
+--------------+------------+
only showing top 20 rows



                                                                                

In [58]:
!hadoop fs -ls /data/olist_proc/

Found 3 items
drwxr-xr-x   - root hadoop          0 2025-05-14 09:14 /data/olist_proc/cleaned_data.parquet
drwxr-xr-x   - root hadoop          0 2025-05-22 13:55 /data/olist_proc/full_orders_df_3.parquet
drwxr-xr-x   - root hadoop          0 2025-05-14 09:14 /data/olist_proc/product_df_cleaned.parquet


In [59]:
full_orders_df.write.mode('overwrite').parquet('/data/olist_proc/full_orders_df_3.parquet')

                                                                                

In [60]:
!hadoop fs -ls -h /data/olist_proc/

Found 3 items
drwxr-xr-x   - root hadoop          0 2025-05-14 09:14 /data/olist_proc/cleaned_data.parquet
drwxr-xr-x   - root hadoop          0 2025-05-22 14:05 /data/olist_proc/full_orders_df_3.parquet
drwxr-xr-x   - root hadoop          0 2025-05-14 09:14 /data/olist_proc/product_df_cleaned.parquet


In [61]:
spark.stop()