In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName('OlistDate')\
        .getOrCreate()

26/01/23 10:55:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [3]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv', header=True, inferSchema=True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv', header=True, inferSchema=True)
order_item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv', header=True, inferSchema=True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv', header=True, inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv', header=True, inferSchema=True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv', header=True, inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv', header=True, inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv', header=True, inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv', header=True, inferSchema=True)

                                                                                

In [4]:
# Cache Frequently used Data for Performance

orders_df.cache()
customers_df.cache()
order_item_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [5]:
order_items_joined_df = orders_df.join(order_item_df, 'order_id', 'inner')

In [6]:
order_items_products_df = order_items_joined_df.join(products_df, 'product_id', 'inner')

In [7]:
order_items_products_sellers_df = order_items_products_df.join(sellers_df, 'seller_id', 'inner')

In [8]:
full_orders_join = order_items_products_sellers_df.join(customers_df, 'customer_id', 'inner')

In [10]:
#Geolocation Data
geolocation_df.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [12]:
full_orders_join = full_orders_join.join(geolocation_df, full_orders_join.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix, 'left')

In [13]:
full_orders_join = full_orders_join.join(reviews_df,'order_id','left')

In [14]:
full_orders_df = full_orders_join.join(payments_df,'order_id','left')

In [15]:
full_orders_df.cache()

26/01/23 10:13:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [17]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [19]:
# Total Revenues Per Seller
from pyspark.sql.functions import *
seller_revenue_df = full_orders_df.groupBy('seller_id').agg(sum('price').alias('total_revenue'))

In [20]:
seller_revenue_df.show(5)



+--------------------+------------------+
|           seller_id|     total_revenue|
+--------------------+------------------+
|d650b663c3b5f6fb3...|         2253869.1|
|cd06602b43d8800bd...|353150.98000000033|
|3c487ae8f8d7542be...|1618845.7000000055|
|d354c38a7182125a7...| 318455.8700000011|
|e9779976487b77c6d...| 6293200.690000004|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [25]:
# Total Orders Per Customers
total_orders_per_customers = full_orders_df.groupBy('customer_id').agg(count('order_id').alias('total_orders'))
total_orders_per_customers.show(5)



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|fc60d04fc5b40da6a...|         110|
|322491e830e0cc584...|         104|
|76dd2e33346ff360e...|         104|
|cb567aae66e06084c...|          52|
|819f448337cc56960...|          52|
+--------------------+------------+
only showing top 5 rows



                                                                                

In [27]:
# Average Review Score Per Seller
average_review_score = full_orders_df.groupBy('seller_id').agg(avg('review_score').alias('avg_reviews'))
average_review_score.show(5)



+--------------------+------------------+
|           seller_id|       avg_reviews|
+--------------------+------------------+
|3d5d0dc7073a299e3...|3.6020916334661353|
|2138ccb85b11a4ec1...| 3.910233200368211|
|33ac3e28642ab8bda...| 4.348423605497171|
|cc419e0650a3c5ba7...| 4.072278928275249|
|8e6cc767478edae94...|  3.89280737485261|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [28]:
# Most Sold Products (Top 10)
most_sold_products = full_orders_df.groupBy('product_id').agg(count('order_id').alias('top selling')).orderBy(col('top selling').desc())
most_sold_products.show(10)



+--------------------+-----+
|          product_id|count|
+--------------------+-----+
|aca2eb7d00ea1a7b8...|86740|
|422879e10f4668299...|81110|
|99a4788cb24856965...|78775|
|389d119b48cf3043d...|60248|
|d1c427060a0f73f6b...|59274|
|368c6c730842d7801...|58358|
|53759a2ecddad2bb8...|52654|
|53b36df67ebb7c415...|52105|
|154e7e31ebfa09220...|42700|
|3dd2a17168ec895c7...|40787|
+--------------------+-----+
only showing top 10 rows



                                                                                

In [29]:
# Top Customers By Spending
top_customers = full_orders_df.groupBy('customer_id').agg(sum('price').alias('total_spent')).orderBy(col('total_spent').desc())
top_customers.show(10)



+--------------------+------------------+
|         customer_id|       total_spent|
+--------------------+------------------+
|d3e82ccec3cb5f956...|         6662844.0|
|df55c14d1476a9a34...|         3565657.0|
|fe5113a38e3575c04...|         3293604.0|
|ec5b2ba62e5743423...|         2556120.0|
|63b964e79dee32a35...|         2501664.0|
|46bb3c0b1a65c8399...|         2336752.0|
|05455dfa7cd02f13d...| 2160194.400000087|
|3690e975641f01bd0...|         2124498.0|
|349509b216bd5ec11...|         1923627.0|
|695476b5848d64ba0...|1820543.1299999943|
+--------------------+------------------+
only showing top 10 rows



                                                                                

Optimized Joins

In [5]:
from pyspark.sql.functions import *

In [6]:
order_items_joined_df = orders_df.join(order_item_df, 'order_id', 'inner')

In [7]:
order_items_products_df = order_items_joined_df.join(products_df, 'product_id', 'inner')

In [8]:
order_items_products_sellers_df = order_items_products_df.join(broadcast(sellers_df), 'seller_id', 'inner')

In [9]:
full_orders_join = order_items_products_sellers_df.join(customers_df, 'customer_id', 'inner')

In [10]:
full_orders_join = full_orders_join.join(broadcast(geolocation_df), full_orders_join.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix, 'left')

In [11]:
full_orders_join = full_orders_join.join(broadcast(reviews_df),'order_id','left')

In [12]:
full_orders_df = full_orders_join.join(payments_df,'order_id','left')

In [13]:
full_orders_df.cache()

26/01/23 10:56:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [14]:
# Total Orders Per Customers
total_orders_per_customers = full_orders_df.groupBy('customer_id').agg(count('order_id').alias('total_orders'))
total_orders_per_customers.show(5)



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|8f4758b55faa41da5...|          46|
|bc12c0a740d705dbe...|          92|
|d9f57c5a009cd22a4...|         194|
|aa62a16b10e3fb24e...|         127|
|3a832bedadec4eeb6...|         221|
+--------------------+------------+
only showing top 5 rows



                                                                                

In [15]:
# Most Sold Products (Top 10)
most_sold_products = full_orders_df.groupBy('product_id').agg(count('order_id').alias('top selling')).orderBy(col('top selling').desc())
most_sold_products.show(10)



+--------------------+-----------+
|          product_id|top selling|
+--------------------+-----------+
|aca2eb7d00ea1a7b8...|      86740|
|422879e10f4668299...|      81110|
|99a4788cb24856965...|      78775|
|389d119b48cf3043d...|      60248|
|d1c427060a0f73f6b...|      59274|
|368c6c730842d7801...|      58358|
|53759a2ecddad2bb8...|      52654|
|53b36df67ebb7c415...|      52105|
|154e7e31ebfa09220...|      42700|
|3dd2a17168ec895c7...|      40787|
+--------------------+-----------+
only showing top 10 rows



                                                                                

#Window Function and Ranking

In [16]:
#Rank Top Selling Products per Seller

In [17]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))


In [20]:
top_seller_products_df = full_orders_df.withColumn('rank', rank().over(window_spec)).filter(col('rank') <=5)

In [22]:
top_seller_products_df.select('seller_id','price','rank').show()

[Stage 40:>                                                         (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

In [None]:
#Dense Rank for Sellers Based on Revenue

In [24]:
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [26]:
sellers_per_revenue = full_orders_df.withColumn('dense rank', dense_rank().over(window_spec)).filter(col('dense rank') <=5)

In [29]:
sellers_per_revenue.select('seller_id','price','dense rank').show()

[Stage 46:>                                                         (0 + 1) / 1]

+--------------------+-----+----------+
|           seller_id|price|dense rank|
+--------------------+-----+----------+
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
+--------------------+-----+----------+
only showing top 20 rows



                                                                                

#Advance Aggregations And Enrichment

In [30]:
# Total Revenue and Average Order Value (AOV) per Customer

In [31]:
customer_spending_df = full_orders_df.groupBy('customer_id')\
    .agg(
        count('order_id').alias('total_orders'),
        sum('price').alias('total_spent'),
        round(avg('price'),2).alias('AOV')
    )\
    .orderBy(desc('total_spent'))

customer_spending_df.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [41]:
#Select Performance Metrics (Revenue, Average Review, Order Count)

seller_performance_df = full_orders_df.groupBy('seller_id')\
                .agg(
                    sum('price').alias('total_revenue'),
                    count('order_id').alias('total_orders'),
                    round(avg('review_score'),2).alias('average_review'),
                    round(stddev('price'),2).alias('price_varibility')
        )\
        .orderBy(desc('total_revenue'))

In [42]:
seller_performance_df.show()



+--------------------+--------------------+------------+--------------+----------------+
|           seller_id|       total_revenue|total_orders|average_review|price_varibility|
+--------------------+--------------------+------------+--------------+----------------+
|4869f7a5dfa277a7d...| 3.613871731999314E7|      184587|          4.09|          111.65|
|53243585a1d6dc264...|3.4291592950000696E7|       54514|          4.12|          499.65|
|4a3ca9315b744ce9f...| 3.375957084001202E7|      330661|          3.77|           59.37|
|7c67e1448b00f6e96...|3.2282321790021457E7|      233306|          3.42|           50.39|
|fa1c13f2614d7b5c4...|3.0139386310006626E7|       87686|          4.38|           307.7|
|da8622b14eb17ae28...| 2.985766973003611E7|      264433|          3.98|           72.92|
|7e93a43ef30c4f03f...| 2.631570630000355E7|       50226|          4.15|          377.24|
|1025f0e2d44d7041d...|2.2937518520012792E7|      229587|          3.89|            84.3|
|46dc3b2cc0980fb8e...

                                                                                

In [43]:
# Product Popularity Matrix

In [44]:
products_metrics_df = full_orders_df.groupBy('product_id')\
            .agg(
                count('order_id').alias('total_sales'),
                sum('price').alias('total_revenue'),
                round(avg('price'),2).alias('avg_price'),
                round(stddev('price'),2).alias('price_volatility'),
                collect_list('seller_id').alias('unique_sellers')
        )\
        .orderBy(desc('total_sales'))

In [45]:
products_metrics_df.show()

                                                                                

+--------------------+-----------+------------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740| 6164630.299996213|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.509997526|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.709996367|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|3280533.1299988106|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.330002412|   138.68|           16.58|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358| 3181698.899999002|    54.52|            4.59|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654|2893017.4999994747|    54.94|            4.52|[1

In [56]:
# Monthly Revenue and Order Count Trend 

monthly_trend = full_orders_df.groupBy(month('order_purchase_timestamp').alias('Month'))\
                    .agg(
                        count('order_id').alias('total_orders'),
                        sum('price').alias('total_revenue'),
                        round(avg('price'),2).alias('avg_order_val'),
                        round(min('price'),2).alias('min_order_val'),
                        round(max('price'),2).alias('max_order_val')
            )\
            .orderBy(asc('Month'))

In [57]:
monthly_trend.show()



+-----+------------+--------------------+-------------+-------------+-------------+
|Month|total_orders|       total_revenue|avg_order_val|min_order_val|max_order_val|
+-----+------------+--------------------+-------------+-------------+-------------+
|    1|     1495580|1.7153290149996296E8|       114.69|          2.9|       3690.0|
|    2|     1551163|1.7878178407014424E8|       115.26|         2.99|       6735.0|
|    3|     1809467|2.1868116843072027E8|       120.85|          4.9|      4099.99|
|    4|     1693860|2.1715696913056687E8|        128.2|         0.85|       4799.0|
|    5|     1918571| 2.400611519711523E8|       125.12|          3.5|       6499.0|
|    6|     1701909| 2.102433234903824E8|       123.53|         3.49|       4590.0|
|    7|     1847639|2.2290885710064745E8|       120.65|          1.2|       6729.0|
|    8|     1914313| 2.319587078507358E8|       121.17|          2.2|      4399.87|
|    9|      763178|  9.59129888597319E7|       125.68|         2.29|       

                                                                                

In [53]:
# Customer Retention Analysis

In [75]:
cust_retention_df = (
    full_orders_df
    .groupBy("customer_unique_id")  
    .agg(
        min("order_purchase_timestamp").alias("first_order_date"),
        max("order_purchase_timestamp").alias("last_order_date"),
        countDistinct("order_id").alias("total_orders"),
        round(avg("price"), 2).alias("aov")
    )
    .orderBy(desc("total_orders"))
)

In [76]:
cust_retention_df.show()



+--------------------+-------------------+-------------------+------------+------+
|  customer_unique_id|   first_order_date|    last_order_date|total_orders|   aov|
+--------------------+-------------------+-------------------+------------+------+
|8d50f5eadf50201cc...|2017-05-15 23:30:03|2018-08-20 19:14:26|          16|  45.6|
|3e43e6105506432c9...|2017-09-18 18:53:15|2018-02-27 18:36:39|           9| 76.09|
|6469f99c1f9dfae77...|2017-09-19 01:02:44|2018-06-28 00:43:34|           7|  73.8|
|ca77025e7201e3b30...|2017-10-09 12:34:39|2018-06-01 11:38:29|           7| 67.22|
|1b6c7548a2a1f9037...|2017-11-13 16:44:41|2018-02-14 13:22:12|           7| 85.52|
|dc813062e0fc23409...|2017-07-01 04:22:21|2018-08-23 00:07:26|           6| 62.52|
|47c1a3033b8b77b3a...|2017-08-07 14:14:22|2018-01-24 15:15:26|           6|  88.7|
|f0e310a6839dce9de...|2017-05-20 08:53:30|2018-04-05 09:04:45|           6| 73.01|
|12f5d6e1cbf93dafd...|2017-01-05 14:18:03|2017-01-05 15:25:10|           6|  9.73|
|63c

                                                                                

#Entended Enrichment

In [84]:
full_orders_df = full_orders_df.withColumn('is_delivered', when(col('order_status') == 'delivered', lit(1)).otherwise(lit(0)))\
                                .withColumn('is_canceled', when(col('order_status') == 'canceled', lit(1)).otherwise(lit(0)))

In [87]:
full_orders_df.where(full_orders_df['order_status']=='canceled').select('order_status', 'is_delivered', 'is_canceled').show(10)

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
+------------+------------+-----------+
only showing top 10 rows



In [88]:
# Order Revenue Calculation

In [89]:
full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+col('freight_value'))

In [90]:
full_orders_df.select('price', 'freight_value','order_revenue').show()

+-----+-------------+-------------+
|price|freight_value|order_revenue|
+-----+-------------+-------------+
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
| 58.9|        13.29|        72.19|
+-----+-------------+-------------+
only showing top 20 rows



In [91]:
# Customer Segmentation based on spending

In [92]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

                                                                                

In [96]:
customer_spending_df = customer_spending_df.withColumn(
                    'customer_segment', 
                    when(col('AOV') >= 1200, 'High-Value')\
                    .when(col('AOV').between(500,1200), 'Medium-Value')\
                    .otherwise('Low_Value'))

In [100]:
customer_spending_df.show()



+--------------------+------------+------------------+-------+----------------+
|         customer_id|total_orders|       total_spent|    AOV|customer_segment|
+--------------------+------------+------------------+-------+----------------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|    Medium-Value|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|      High-Value|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|      High-Value|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|      High-Value|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|       Low_Value|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|      High-Value|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|    Medium-Value|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|      High-Value|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|      High-Value|
|695476b5848d64ba0...|         687|18205

                                                                                

In [101]:
full_orders_df = full_orders_df.join(customer_spending_df.select('customer_id', 'customer_segment'), 'customer_id', how='left')

In [102]:
full_orders_df.select('customer_id', 'customer_segment').show()

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
|c08ef557085ca9fb0...|       Low_Value|
+--------------------+----------------+
only showing top 20 rows



In [None]:
# Hourly Order Distribution

In [103]:
full_orders_df = full_orders_df.withColumn('hour of day', hour('order_purchase_timestamp'))

In [104]:
full_orders_df.select('customer_id', 'order_purchase_timestamp', 'hour of day').show()

+--------------------+------------------------+-----------+
|         customer_id|order_purchase_timestamp|hour of day|
+--------------------+------------------------+-----------+
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08:59:02|          8|
|3ce436f183e68e078...|     2017-09-13 08

In [105]:
# WeekDay vs Weekend Order

In [106]:
full_orders_df = full_orders_df.withColumn('order_date_type', when(expr('dayofweek(order_purchase_timestamp) IN (1,7)'), lit('Weekend'))\
                                                                   .otherwise(lit('Weekday')))

In [109]:
full_orders_df.select('customer_id', 'order_purchase_timestamp', 'order_date_type').show()

+--------------------+------------------------+---------------+
|         customer_id|order_purchase_timestamp|order_date_type|
+--------------------+------------------------+---------------+
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08:59:02|        Weekday|
|3ce436f183e68e078...|     2017-09-13 08

In [108]:
full_orders_df.where(full_orders_df['order_date_type']== 'Weekend').select('customer_id', 'order_purchase_timestamp', 'order_date_type').show()

+--------------------+------------------------+---------------+
|         customer_id|order_purchase_timestamp|order_date_type|
+--------------------+------------------------+---------------+
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14:33:31|        Weekend|
|6489ae5e4333f3693...|     2018-01-14 14

In [110]:
full_orders_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [112]:
# Frieght Category

full_orders_df = full_orders_df.withColumn(
                    'frieght_category', 
                    when(col('freight_value') >= 10, 'High-Value')\
                    .when(col('freight_value').between(5,10), 'Medium-Value')\
                    .otherwise('Low_Value'))

In [113]:
full_orders_df.select('freight_value', 'frieght_category').show()

+-------------+----------------+
|freight_value|frieght_category|
+-------------+----------------+
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
|        13.29|      High-Value|
+-------------+----------------+
only showing top 20 rows



In [114]:
#
Order_volumne_per_state = full_orders_df.groupBy('geolocation_state')\
                            .agg(count('order_id').alias('total_orders'))\
                            .orderBy(desc('total_orders'))
Order_volumne_per_state.show(10)



+-----------------+------------+
|geolocation_state|total_orders|
+-----------------+------------+
|               SP|     6742207|
|               RJ|     3626836|
|               MG|     3433229|
|               RS|      971705|
|               PR|      746514|
|               SC|      644944|
|               BA|      443980|
|               ES|      367211|
|               GO|      162415|
|               MT|      155225|
+-----------------+------------+
only showing top 10 rows



                                                                                