In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('OlistData') \
    .getOrCreate()

25/10/18 04:47:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
hdfs_path = '/data/olist/'

In [5]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True,inferSchema=True)
order_item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True,inferSchema=True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True,inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True,inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True,inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True,inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [6]:
## caching Frequency used data for better performance

orders_df.cache()
customers_df.cache()
order_item_df.cache()


DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

# 1. Joins for Data Integration

In [7]:
orders_items_joined_df = orders_df.join(order_item_df,'order_id','inner')

In [8]:
orders_items_joined_df.show()

                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|3ce436f183e68e078...|   delivered|     2017-09-13 08:59:02|2017-09-13 09:45:35|         2017-09-19 18:34:16|      

In [9]:
order_items_products_df = orders_items_joined_df.join(products_df,'product_id','inner')

In [10]:
orders_items_products_sellers_df = order_items_products_df.join(sellers_df,'seller_id','inner')

In [11]:
full_orders_df = orders_items_products_sellers_df.join(customers_df,'customer_id','inner')

In [12]:
full_orders_df = full_orders_df.join(geolocation_df,full_orders_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix,'left')

In [13]:
full_orders_df = full_orders_df.join(reviews_df,'order_id','left')

In [14]:
full_orders_df = full_orders_df.join(payments_df,'order_id','left')

In [15]:
full_orders_df.cache()

25/10/18 04:47:31 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

# 2. Aggregation

In [16]:
from pyspark.sql.functions import *

In [17]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [18]:
## Total Revenue Per Seller

seller_revenue_df = full_orders_df.groupBy('seller_id').agg(sum('price'))

In [19]:
seller_revenue_df.show(5)



+--------------------+--------------------+
|           seller_id|          sum(price)|
+--------------------+--------------------+
|3d5d0dc7073a299e3...|            170639.6|
|2138ccb85b11a4ec1...|  1943866.0699999991|
|33ac3e28642ab8bda...|   615628.8499999986|
|cc419e0650a3c5ba7...|1.4751464500000073E7|
|b76dba6c951ab00dc...|   302582.6599999989|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [20]:
## Total Order Per Customer

order_per_customer = full_orders_df.groupBy(col('customer_unique_id')).agg(countDistinct(col('order_id')).alias('total_orders'))

sorted_order_per_customer = order_per_customer.sort(desc('total_orders'))

sorted_order_per_customer.show(5)



+--------------------+------------+
|  customer_unique_id|total_orders|
+--------------------+------------+
|8d50f5eadf50201cc...|          16|
|3e43e6105506432c9...|           9|
|6469f99c1f9dfae77...|           7|
|ca77025e7201e3b30...|           7|
|1b6c7548a2a1f9037...|           7|
+--------------------+------------+
only showing top 5 rows



                                                                                

In [21]:
## Average Review score per Seller

average_review_score = full_orders_df.groupBy('seller_id').agg(avg('review_score'))

average_review_score.show(5)



+--------------------+------------------+
|           seller_id| avg(review_score)|
+--------------------+------------------+
|d650b663c3b5f6fb3...| 4.276244626265428|
|cd06602b43d8800bd...|3.7134436635683516|
|3c487ae8f8d7542be...| 4.276648299448984|
|d354c38a7182125a7...| 4.266468180126535|
|e9779976487b77c6d...|4.2127329046350255|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [22]:
## Most Sold Products(Top 10)

sold_products = full_orders_df.groupBy('product_id').agg(count('product_id').alias('Total sales'))

most_sold_products = sold_products.sort(desc('Total Sales'))

most_sold_products.show(10)



+--------------------+-----------+
|          product_id|Total sales|
+--------------------+-----------+
|aca2eb7d00ea1a7b8...|      86740|
|422879e10f4668299...|      81110|
|99a4788cb24856965...|      78775|
|389d119b48cf3043d...|      60248|
|d1c427060a0f73f6b...|      59274|
|368c6c730842d7801...|      58358|
|53759a2ecddad2bb8...|      52654|
|53b36df67ebb7c415...|      52105|
|154e7e31ebfa09220...|      42700|
|3dd2a17168ec895c7...|      40787|
+--------------------+-----------+
only showing top 10 rows



                                                                                

In [23]:
## Top Customers by spending

spend_customer = full_orders_df.groupBy('customer_unique_id').agg(sum('price').alias('Top Buyer'))

top_customers = spend_customer.sort(desc('Top Buyer'))
top_customers.show(10)



+--------------------+------------------+
|  customer_unique_id|         Top Buyer|
+--------------------+------------------+
|fd7069cf6891edbf3...|         6662844.0|
|da122df9eeddfedc1...|         5489284.0|
|eb53ee675b0c79d66...|         3293604.0|
|763c8b1c9c68a0229...|         2556120.0|
|85963fd37bfd387aa...|         2501664.0|
|7a96eb0a685f5c19b...|         2336752.0|
|4007669dec559734d...| 2160194.400000087|
|397b44d5bb99eabf5...|         2143390.0|
|60b2ec19b8c18082c...|         2124498.0|
|13d9920a161273515...|1820543.1299999943|
+--------------------+------------------+
only showing top 10 rows



                                                                                

# 3. Window Function and Ranking

In [24]:
from pyspark.sql.window import Window

In [25]:
# Dense rank for Sellers based on revenue

window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [26]:
## ranking Top Selling Products per seller

top_seller_products_df = full_orders_df.withColumn('rank',rank().over(window_spec)).filter(col('rank')<=5)

top_seller_products_df.select('seller_id','price','rank').show()

[Stage 80:>                                                         (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

In [27]:
## Dense Rank For Sellers Based on revenue

seller_revenue_df = full_orders_df.groupBy(col('seller_id')).agg(sum(col('payment_value')).alias('seller_revenue'))

windows_spec = Window.orderBy(desc('seller_revenue'))

final_seller_ranking_df = seller_revenue_df.withColumn('seller_dense_rank',dense_rank().over(windows_spec))

final_seller_ranking_df.sort(col('seller_dense_rank')).show(10)
                                                                

25/10/18 04:50:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/18 04:50:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/18 04:50:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/18 04:50:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/18 04:50:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/18 04:51:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/18 0

+--------------------+--------------------+-----------------+
|           seller_id|      seller_revenue|seller_dense_rank|
+--------------------+--------------------+-----------------+
|7c67e1448b00f6e96...| 8.568359756000137E7|                1|
|da8622b14eb17ae28...| 5.381143433000069E7|                2|
|1025f0e2d44d7041d...| 5.043467467999989E7|                3|
|4a3ca9315b744ce9f...|4.8727295520000204E7|                4|
|1f50f920176fa81da...|4.4570278139998965E7|                5|
|4869f7a5dfa277a7d...|4.0076877290000096E7|                6|
|53243585a1d6dc264...| 3.734442017000002E7|                7|
|955fee9216a65b617...| 3.468395314000033E7|                8|
|fa1c13f2614d7b5c4...|3.0784219589999985E7|                9|
|7e93a43ef30c4f03f...|       2.682890093E7|               10|
+--------------------+--------------------+-----------------+
only showing top 10 rows



                                                                                

# 4. Advance aggregation and Enrichment

In [28]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [29]:
## Total revenue & Average Order Value(AOV) per Customer

customer_spending_df = full_orders_df.groupBy('customer_id').agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_spent'),
    round(avg('price'),2).alias('AOV')
       )\
.orderBy(desc('total_spent'))
    
customer_spending_df.show()





+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [30]:
## Seller performance Metrices (Revenue, average Review, order Count)

seller_performance_df = full_orders_df.groupBy('seller_id').agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_revenue'),
    round(avg('review_score'),2).alias('avg_review_score'),
    round(stddev('price'),2).alias('price_variability')
)\
.orderBy(desc('total_revenue'))

seller_performance_df.show()



+--------------------+------------+--------------------+----------------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review_score|price_variability|
+--------------------+------------+--------------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.613871732000001E7|            4.09|           111.65|
|53243585a1d6dc264...|       54514| 3.429159295000001E7|            4.12|           499.65|
|4a3ca9315b744ce9f...|      330661| 3.375957084000011E7|            3.77|            59.37|
|7c67e1448b00f6e96...|      233306|3.2282321789999805E7|            3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686|3.0139386309999976E7|            4.38|            307.7|
|da8622b14eb17ae28...|      264433|2.9857669730000027E7|            3.98|            72.92|
|7e93a43ef30c4f03f...|       50226| 2.631570629999997E7|            4.15|           377.24|
|1025f0e2d44d7041d...|      229587|2.2937518519999973E7|            3.89|       

                                                                                

In [31]:
## Product Popularity Metrices

product_metrices_df = full_orders_df.groupBy('product_id').agg(
    count('order_id').alias('total_sales'),
    sum('price').alias('total_revenue'),
    round(avg('review_score'),2).alias('review_score'),
    round(avg('price'),2).alias('avg_price'),
    round(stddev('price'),2).alias('price_volatility'),\
    collect_set('seller_id').alias('unique_sellers')
)\
.orderBy(desc('total_sales'))

product_metrices_df.show()

                                                                                

+--------------------+-----------+------------------+------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|review_score|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740|6164630.3000000175|        4.01|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.510000013|        3.85|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.710000015|        4.01|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248| 3280533.130000012|        4.25|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274|  8220103.32999999|        4.18|   138.68|           16.58|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358| 3181698.899999993|        4.16|    54.52|    

In [40]:
## Monthly Reevenue and Order Count Trend

monthly_sales = full_orders_df.withColumn('sales_per_month',date_format(col('order_purchase_timestamp'),'MM'))

top_selling_month = monthly_sales.groupBy('sales_per_month').agg(
    countDistinct(col('order_id')).alias('total_orders'),
    format_number(sum(col('price')),2).alias('total_revenue'),
    round(avg('price'),2).alias('Average Order Value'),
                            
)\
.orderBy(desc('total_orders'))

top_selling_month.show(5)



+---------------+------------+--------------+-------------------+
|sales_per_month|total_orders| total_revenue|Average Order Value|
+---------------+------------+--------------+-------------------+
|             08|       10745|231,958,707.85|             121.17|
|             05|       10513|240,061,151.97|             125.12|
|             07|       10242|222,908,857.10|             120.65|
|             03|        9829|218,681,168.43|             120.85|
|             06|        9377|210,243,323.49|             123.53|
+---------------+------------+--------------+-------------------+
only showing top 5 rows



                                                                                

In [47]:
## Customer Retention Analysis (First and Last Orders)
customer_retention_df = full_orders_df.groupBy('customer_unique_id').agg(
    min('order_purchase_timestamp').alias('first_order_date'),
    max('order_purchase_timestamp').alias('last_order_date'),
    count('order_id').alias('total_orders'),
    round(avg('price'),2).alias('AOV')
)\
.orderBy(desc('total_orders'))

customer_retention_df.show(10)



+--------------------+-------------------+-------------------+------------+------+
|  customer_unique_id|   first_order_date|    last_order_date|total_orders|   AOV|
+--------------------+-------------------+-------------------+------------+------+
|74adf920dbd3d2e6e...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427| 85.99|
|41a3b256cc497dc95...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752| 43.82|
|9a736b248f67d166d...|2017-07-27 12:50:46|2017-08-08 20:26:31|        9525| 37.97|
|d425130be1b15a458...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556|  26.4|
|86bfc49565a9ca52f...|2018-01-05 19:15:37|2018-06-12 20:46:51|        8022| 17.85|
|873e4275daabf5ab6...|2018-03-06 19:21:47|2018-03-06 19:21:52|        7714| 72.06|
|397b44d5bb99eabf5...|2018-01-11 12:16:24|2018-06-17 19:21:01|        7714|277.86|
|fd7069cf6891edbf3...|2017-03-18 14:28:34|2017-03-18 14:28:34|        6876| 969.0|
|31e412b9fb766b679...|2017-12-13 14:21:15|2017-12-13 14:21:15|        6706|  59.0|
|e0f

                                                                                

# 5. Extended Enrichment

In [48]:
## Order Status Flags

full_orders_df = full_orders_df.withColumn('is_delivered',\
    when(col('order_status')=='delivered',lit(1)).otherwise(lit(0)))\
    .withColumn('is_canceled',when(col('order_status')=='canceled',lit(1)).otherwise(lit(0)))


In [50]:
full_orders_df.where(full_orders_df['order_status']=='canceled')\
    .select('order_status','is_delivered','is_canceled').show(100)

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|


In [54]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [55]:
## Order revenue Calculation

full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+col('freight_value'))

full_orders_df.select('price','freight_value','order_revenue').show()

+-----+-------------+-------------+
|price|freight_value|order_revenue|
+-----+-------------+-------------+
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
|330.0|        10.07|       340.07|
+-----+-------------+-------------+
only showing top 20 rows



In [56]:
customer_spending_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- total_orders: long (nullable = false)
 |-- total_spent: double (nullable = true)
 |-- AOV: double (nullable = true)



In [69]:
## Customer Segmentation Based on Spending

customer_spending_df = customer_spending_df.withColumn(
    'customer_segment',
    when(col('AOV') >=1200, 'High-Value')
    .when((col('AOV') <1200) & (col('AOV') >=700),'Medium_Value').otherwise('Low_value')
)
    
customer_spending_df.show()



+--------------------+------------+------------------+-------+----------------+
|         customer_id|total_orders|       total_spent|    AOV|customer_segment|
+--------------------+------------+------------------+-------+----------------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|    Medium_Value|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|      High-Value|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|      High-Value|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|      High-Value|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|       Low_value|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|      High-Value|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|    Medium_Value|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|      High-Value|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|      High-Value|
|695476b5848d64ba0...|         687|18205

                                                                                

In [79]:
full_orders_df = full_orders_df.join(customer_spending_df
                                     .select('customer_id','customer_segment'),
                                     'customer_id',how='left')

In [81]:
full_orders_df.select('customer_id','customer_segment').show()

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
|dbef5eb24f60585a8...|       Low_value|
+--------------------+----------------+
only showing top 20 rows



In [82]:
full_orders_df.select('order_purchase_timestamp').show()

+------------------------+
|order_purchase_timestamp|
+------------------------+
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
|     2018-05-07 15:00:05|
+------------------------+
only showing top 20 rows



In [83]:
## Hourly Order Distribution

full_orders_df = full_orders_df.withColumn('hour_of_day',expr('hour(order_purchase_timestamp)'))

In [84]:
full_orders_df.select('order_purchase_timestamp','hour_of_day').show()

+------------------------+-----------+
|order_purchase_timestamp|hour_of_day|
+------------------------+-----------+
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
|     2018-05-07 15:00:05|         15|
+------------------------+-----------+
only showing top 20 rows



In [85]:
## Weekend vs Weekday Orders

full_orders_df = full_orders_df.withColumn('order_day_type',\
        when(dayofweek('order_purchase_timestamp').isin(1,7),lit('Weekend')).otherwise(lit('Weekday')))

In [86]:
full_orders_df.select('order_purchase_timestamp','order_day_type').show()

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
|     2018-05-07 15:00:05|       Weekday|
+------------------------+--------

In [87]:
full_orders_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [89]:
!hadoop fs -mkdir /data/olist/processed/

In [90]:
full_orders_df.write.mode('overwrite').parquet('/data/olist/processed')

                                                                                

In [91]:
!hadoop fs -ls -h /data/olist/processed/

Found 11 items
-rw-r--r--   2 root hadoop          0 2025-10-18 06:23 /data/olist/processed/_SUCCESS
-rw-r--r--   2 root hadoop      8.5 M 2025-10-18 06:23 /data/olist/processed/part-00000-c3f55f6a-900a-46aa-9be1-0677c5b48668-c000.snappy.parquet
-rw-r--r--   2 root hadoop      7.9 M 2025-10-18 06:23 /data/olist/processed/part-00001-c3f55f6a-900a-46aa-9be1-0677c5b48668-c000.snappy.parquet
-rw-r--r--   2 root hadoop      7.7 M 2025-10-18 06:22 /data/olist/processed/part-00002-c3f55f6a-900a-46aa-9be1-0677c5b48668-c000.snappy.parquet
-rw-r--r--   2 root hadoop      9.2 M 2025-10-18 06:23 /data/olist/processed/part-00003-c3f55f6a-900a-46aa-9be1-0677c5b48668-c000.snappy.parquet
-rw-r--r--   2 root hadoop      9.1 M 2025-10-18 06:23 /data/olist/processed/part-00004-c3f55f6a-900a-46aa-9be1-0677c5b48668-c000.snappy.parquet
-rw-r--r--   2 root hadoop      8.4 M 2025-10-18 06:23 /data/olist/processed/part-00005-c3f55f6a-900a-46aa-9be1-0677c5b48668-c000.snappy.parquet
-rw-r--r--   2 root hadoop   