In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("OlisData").getOrCreate()

In [4]:
hdfs_path = "/content/brazilian-ecommerce"

In [5]:
customer_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_customers_dataset.csv")
geolocation_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_geolocation_dataset.csv")
order_items_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_order_items_dataset.csv")
order_payments_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_order_payments_dataset.csv")
order_reviews_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_order_reviews_dataset.csv")
orders_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_orders_dataset.csv")
products_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_products_dataset.csv")
sellers_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/olist_sellers_dataset.csv")
product_category_name_translation = spark.read.format("csv").option("header","true").option("inferSchema","true").load(hdfs_path+"/product_category_name_translation.csv")


In [6]:
orders_df.cache()
customer_df.cache()
order_items_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [8]:
orders_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

In [9]:
orders_items_products_df = orders_items_joined_df.join(products_df,'product_id','inner')

In [10]:
orders_items_products_sellers_df = orders_items_products_df.join(sellers_df,'seller_id','inner')

In [11]:
full_orders_df = orders_items_products_sellers_df.join(customer_df,'customer_id','inner')

#Geolocation Data

In [12]:
geolocation_df.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [6]:
full_orders_df = full_orders_df.join(geolocation_df,geolocation_df['geolocation_zip_code_prefix'] == full_orders_df['customer_zip_code_prefix'] ,'left')

In [16]:
full_orders_df = full_orders_df.join(order_reviews_df,'order_id','left')

In [17]:
full_orders_df = full_orders_df.join(order_payments_df,'order_id','left')

In [18]:
full_orders_df.cache()

DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [8]:
from pyspark.sql.functions import *

#Total Revenues Per Seller

In [21]:
seller_revenus_df = full_orders_df.groupBy('seller_id').agg(sum('price').alias('total_revenue')).orderBy('total_revenue',ascending=False)

In [22]:
seller_revenus_df.show()

+--------------------+--------------------+
|           seller_id|       total_revenue|
+--------------------+--------------------+
|4869f7a5dfa277a7d...| 3.613871732000001E7|
|53243585a1d6dc264...| 3.429159294999997E7|
|4a3ca9315b744ce9f...| 3.375957083999998E7|
|7c67e1448b00f6e96...|3.2282321789999593E7|
|fa1c13f2614d7b5c4...|3.0139386310000043E7|
|da8622b14eb17ae28...|2.9857669729999945E7|
|7e93a43ef30c4f03f...| 2.631570629999995E7|
|1025f0e2d44d7041d...|2.2937518519999865E7|
|46dc3b2cc0980fb8e...| 2.179177328999988E7|
|955fee9216a65b617...|2.0964410669999987E7|
|7a67c85e85bb2ce85...|       2.031279489E7|
|620c87c171fb2a6dd...| 2.011983959999985E7|
|7d13fca1522535862...|1.8156881909999993E7|
|a1043bafd471dff53...|1.7662675979999986E7|
|6560211a19b47992c...|1.7315932900000006E7|
|edb1ef5e36e0c8cd8...| 1.662483515000007E7|
|1f50f920176fa81da...|1.6497454439999709E7|
|5dceca129747e92ff...| 1.491054833999999E7|
|cc419e0650a3c5ba7...|1.4751464500000088E7|
|3d871de0142ce09b7...|1.41845252

# Join Optimization

In [7]:
orders_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

In [9]:
orders_items_products_df = orders_items_joined_df.join(products_df,'product_id','inner')

In [10]:
orders_items_products_sellers_df = orders_items_products_df.join(broadcast(sellers_df),'seller_id','inner')

In [11]:
full_orders_df = orders_items_products_sellers_df.join(customer_df,'customer_id','inner')

In [12]:
full_orders_df = full_orders_df.join(broadcast(geolocation_df),geolocation_df['geolocation_zip_code_prefix'] == full_orders_df['customer_zip_code_prefix'] ,'left')

In [13]:
full_orders_df = full_orders_df.join(broadcast(order_reviews_df),'order_id','left')

In [14]:
full_orders_df = full_orders_df.join(order_payments_df,'order_id','left')

In [15]:
full_orders_df.cache()

DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

#Aggregation

## Total Orders Per Customer

In [16]:
order_per_customer_df = full_orders_df.groupBy('customer_id').agg(count('order_id').alias('total_orders')).orderBy('total_orders',ascending=False)
order_per_customer_df.show()

+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|351e40989da90e704...|       11427|
|50920f8cd0681fd86...|       10752|
|9b43e2a62de9bab3a...|        8556|
|270c23a11d024a44c...|        8001|
|5c87184371002d49e...|        6876|
|d3e82ccec3cb5f956...|        6876|
|d5f2b3f597c7ccafb...|        6706|
|c2f18647725395af4...|        6612|
|24e7dc2ff8c071263...|        6597|
|7bb57d182bdc11653...|        6258|
|63b964e79dee32a35...|        6072|
|d22f25a9fadfb1abb...|        6072|
|1ff773612ab8934db...|        5820|
|13aa59158da63ba0e...|        5206|
|78fc46047c4a639e8...|        5200|
|dd3f1762eb601f41c...|        4992|
|a193aa8d905b8e246...|        4896|
|9eb3d566e87289dcb...|        4872|
|2ba91e12e5e4c9f56...|        4752|
|1b2ab6eda1946a6ff...|        4728|
+--------------------+------------+
only showing top 20 rows


#Average Review Score Per Seller

In [17]:
average_review_per_seller_df = full_orders_df.groupBy('seller_id').agg(avg('review_score').alias('average_review_score')).orderBy('average_review_score',ascending=False)
average_review_per_seller_df.show()

+--------------------+--------------------+
|           seller_id|average_review_score|
+--------------------+--------------------+
|33ab10be054370c25...|                 5.0|
|8c91bfea4263b8fba...|                 5.0|
|bb7ad8a45c027be8a...|                 5.0|
|cac4e0bc1a3269fa2...|                 5.0|
|2d20f1cd18725c911...|                 5.0|
|98dddbc4601dd4443...|                 5.0|
|e439f7176d763f92d...|                 5.0|
|71dc18fd8cbf5fd77...|                 5.0|
|044668ccd5316b12a...|                 5.0|
|c3f5668699a1b04c0...|                 5.0|
|2bd05d410a8fd26dc...|                 5.0|
|7ab0dd5487bab2dc8...|                 5.0|
|a81466620e46a6970...|                 5.0|
|05730013efda59630...|                 5.0|
|4bde6149c15cf7e17...|                 5.0|
|b5f49fe968dff6b11...|                 5.0|
|1de62b6f2fd962276...|                 5.0|
|b2f4d63c7203f539a...|                 5.0|
|d2572f31e9023e985...|                 5.0|
|a7ecf689b40a44361...|          

# Top 10 Most sold Products

In [18]:
top_10_products_df = full_orders_df.groupBy('product_id').agg(count('order_id').alias('total_sold')).orderBy('total_sold',ascending=False).limit(10)
top_10_products_df.show()


+--------------------+----------+
|          product_id|total_sold|
+--------------------+----------+
|aca2eb7d00ea1a7b8...|     86740|
|422879e10f4668299...|     81110|
|99a4788cb24856965...|     78775|
|389d119b48cf3043d...|     60248|
|d1c427060a0f73f6b...|     59274|
|368c6c730842d7801...|     58358|
|53759a2ecddad2bb8...|     52654|
|53b36df67ebb7c415...|     52105|
|154e7e31ebfa09220...|     42700|
|3dd2a17168ec895c7...|     40787|
+--------------------+----------+



#Window Function and Ranking

#Rank top selling products Per Seller


In [19]:
from pyspark.sql.window import Window

In [20]:
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [21]:
top_seller_products = full_orders_df.withColumn('rank',rank().over(window_spec)).filter(col('rank')<=5)
top_seller_products.show()

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+-------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+----+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_app

#Dense Rank for sellers based on Revenue

In [22]:
top_seller_products_dense_df = full_orders_df.withColumn('dense_rank',dense_rank().over(window_spec)).filter(col('dense_rank')<=5)
top_seller_products.show()

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+-------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+----+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_app

#Advance Aggregation and Enrichment

#Total Revenue and Average Order Value (AOV) per customer

In [23]:
total_revenue_and_aov_df = full_orders_df.groupBy('customer_id')\
.agg(sum('price').alias('total_revenue_new'),count('order_id').alias('total_order'),round(avg('price').alias('AOV')))\
.orderBy(desc('total_revenue_new'))
total_revenue_and_aov_df.show()

+--------------------+------------------+-----------+---------------------------+
|         customer_id| total_revenue_new|total_order|round(avg(price) AS AOV, 0)|
+--------------------+------------------+-----------+---------------------------+
|d3e82ccec3cb5f956...|         6662844.0|       6876|                      969.0|
|df55c14d1476a9a34...|         3565657.0|        743|                     4799.0|
|fe5113a38e3575c04...|         3293604.0|       2292|                     1437.0|
|ec5b2ba62e5743423...|         2556120.0|       1428|                     1790.0|
|63b964e79dee32a35...|         2501664.0|       6072|                      412.0|
|46bb3c0b1a65c8399...|         2336752.0|        748|                     3124.0|
|05455dfa7cd02f13d...| 2160194.400000087|       2184|                      989.0|
|3690e975641f01bd0...|         2124498.0|        802|                     2649.0|
|349509b216bd5ec11...|         1923627.0|        743|                     2589.0|
|695476b5848d64b

#Seller Revenue Metrics (Revenue,Average Review,Order Count)

In [24]:
seller_performance_df = full_orders_df.groupBy('seller_id')\
.agg(
    count('order_id').alias('order_count'),
    avg('review_score').alias('average_review_score'),
    sum('price').alias('total_revenue')
)\
.orderBy(desc('total_revenue'))
seller_performance_df.show()

+--------------------+-----------+--------------------+--------------------+
|           seller_id|order_count|average_review_score|       total_revenue|
+--------------------+-----------+--------------------+--------------------+
|4869f7a5dfa277a7d...|     184587|   4.093344031789699| 3.613871732000001E7|
|53243585a1d6dc264...|      54514|   4.118026783071914| 3.429159294999997E7|
|4a3ca9315b744ce9f...|     330661|  3.7684451702537354| 3.375957083999998E7|
|7c67e1448b00f6e96...|     233306|  3.4183987368185553|3.2282321789999593E7|
|fa1c13f2614d7b5c4...|      87686|   4.378364698845075|3.0139386310000043E7|
|da8622b14eb17ae28...|     264433|  3.9793441825464995|2.9857669729999945E7|
|7e93a43ef30c4f03f...|      50226|   4.145269530238764| 2.631570629999995E7|
|1025f0e2d44d7041d...|     229587|   3.888338091111825|2.2937518519999865E7|
|46dc3b2cc0980fb8e...|      90426|   4.176218962002473| 2.179177328999988E7|
|955fee9216a65b617...|     232364|  4.0378493839381076|2.0964410669999987E7|

#Product Popularity Metrics

In [25]:

from pyspark.sql.functions import count, avg, sum, collect_set, desc, round

product_metrics_df = full_orders_df.groupBy('product_id') \
    .agg(
        count('order_id').alias('order_count'),
        round(avg('price'), 2).alias('avg_price'),
        sum('price').alias('total_revenue'),
        collect_set('seller_id').alias('unique_sellers')
    ) \
    .orderBy(desc('order_count'))

product_metrics_df.show()


+--------------------+-----------+---------+------------------+--------------------+
|          product_id|order_count|avg_price|     total_revenue|      unique_sellers|
+--------------------+-----------+---------+------------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740|    71.07| 6164630.300000002|[955fee9216a65b61...|
|422879e10f4668299...|      81110|    54.77| 4442791.510000016|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775|    87.87|        6921762.71|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|    54.45| 3280533.130000006|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274|   138.68| 8220103.329999998|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358|    54.52|3181698.9000000013|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654|    54.94|2893017.5000000023|[1f50f920176fa81d...|
|53b36df67ebb7c415...|      52105|   118.22| 6159887.409999995|[4869f7a5dfa277a7...|
|154e7e31ebfa09220...|      42700|    22.53| 962160.9999999969|[c

#Monthly Revenue and Order Count Trend

In [26]:
full_orders_df.printSchema()


root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [27]:
full_orders_df = full_orders_df.withColumn('order_purchase_month', month('order_purchase_timestamp'))


In [28]:
full_orders_df.show()

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+---------------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+-------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+--------------------+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_t

In [29]:
monthly_revenue_order_count = full_orders_df.groupBy('order_purchase_month')\
.agg(sum('price').alias('monthly_revenue'),
     count('order_id').alias('order_count'),
     round(avg('price'),2).alias('average_order_value'),
     max('price').alias('max_order_value'),
     min('price').alias('min_order_value')
     )\
     .orderBy('order_purchase_month')
monthly_revenue_order_count.show()

+--------------------+--------------------+-----------+-------------------+---------------+---------------+
|order_purchase_month|     monthly_revenue|order_count|average_order_value|max_order_value|min_order_value|
+--------------------+--------------------+-----------+-------------------+---------------+---------------+
|                   1|1.7153290150000247E8|    1495580|             114.69|         3690.0|            2.9|
|                   2|  1.78781784070002E8|    1551163|             115.26|         6735.0|           2.99|
|                   3| 2.186811684299993E8|    1809467|             120.85|        4099.99|            4.9|
|                   4|2.1715696913000065E8|    1693860|              128.2|         4799.0|           0.85|
|                   5| 2.400611519699952E8|    1918571|             125.12|         6499.0|            3.5|
|                   6|2.1024332348999768E8|    1701909|             123.53|         4590.0|           3.49|
|                   7|2.2290

#Customer Retention Analysis (First and Last Order)

In [30]:
customer_retention_df = full_orders_df.groupBy('customer_id')\
.agg(
    first('order_purchase_timestamp').alias('first_order_date'),
    last('order_purchase_timestamp').alias('last_order_date'),
    count('order_id').alias('total_orders'),
    round(avg('price'),2).alias('aov')
)\
.orderBy(desc('total_orders'))
customer_retention_df.show()

+--------------------+-------------------+-------------------+------------+------+
|         customer_id|   first_order_date|    last_order_date|total_orders|   aov|
+--------------------+-------------------+-------------------+------------+------+
|351e40989da90e704...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427| 85.99|
|50920f8cd0681fd86...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752| 43.82|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556|  26.4|
|270c23a11d024a44c...|2017-08-08 20:26:31|2017-08-08 20:26:31|        8001| 36.59|
|5c87184371002d49e...|2018-01-05 19:15:37|2018-01-05 19:15:37|        6876| 12.49|
|d3e82ccec3cb5f956...|2017-03-18 14:28:34|2017-03-18 14:28:34|        6876| 969.0|
|d5f2b3f597c7ccafb...|2017-12-13 14:21:15|2017-12-13 14:21:15|        6706|  59.0|
|c2f18647725395af4...|2018-03-06 19:21:47|2018-03-06 19:21:47|        6612|  34.9|
|24e7dc2ff8c071263...|2017-11-24 16:16:45|2017-11-24 16:16:45|        6597|  59.2|
|7bb

In [31]:
customer_retention_df = customer_retention_df.withColumn('last_order_date',to_date(col('last_order_date'),'yyyy-MM-dd'))

In [32]:
customer_retention_df.show()


+--------------------+-------------------+---------------+------------+------+
|         customer_id|   first_order_date|last_order_date|total_orders|   aov|
+--------------------+-------------------+---------------+------------+------+
|351e40989da90e704...|2017-07-13 10:42:37|     2017-07-13|       11427| 85.99|
|50920f8cd0681fd86...|2018-01-27 11:28:32|     2018-01-27|       10752| 43.82|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|     2017-05-25|        8556|  26.4|
|270c23a11d024a44c...|2017-08-08 20:26:31|     2017-08-08|        8001| 36.59|
|5c87184371002d49e...|2018-01-05 19:15:37|     2018-01-05|        6876| 12.49|
|d3e82ccec3cb5f956...|2017-03-18 14:28:34|     2017-03-18|        6876| 969.0|
|d5f2b3f597c7ccafb...|2017-12-13 14:21:15|     2017-12-13|        6706|  59.0|
|c2f18647725395af4...|2018-03-06 19:21:47|     2018-03-06|        6612|  34.9|
|24e7dc2ff8c071263...|2017-11-24 16:16:45|     2017-11-24|        6597|  59.2|
|7bb57d182bdc11653...|2018-04-02 17:11:30|     2018-

#Extended Enrichment

##Order Status Flags

In [33]:
full_orders_df = full_orders_df.withColumn('is_delivered',when(col('order_status') == 'delivered',lit(1)).otherwise(lit(0)))\
.withColumn('is_canceled',when(col('order_status') == 'canceled',lit(1)).otherwise(lit(0)))

In [28]:
full_orders_df.select('order_status','is_delivered','is_canceled').show()

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
+------------+------------+-----------+
only showing top 20 rows


##Order Revenue Calculation

In [34]:
full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+col('freight_value'))


In [30]:
full_orders_df.select('price','freight_value','order_revenue').show()

+-----+-------------+------------------+
|price|freight_value|     order_revenue|
+-----+-------------+------------------+
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
| 27.9|         3.81|31.709999999999997|
+-----+-------------+------------------+
only showing top

##Hourly Order Distribution

In [35]:
full_orders_df = full_orders_df.withColumn('hour_of_day',expr('hour(order_purchase_timestamp)'))

In [32]:
full_orders_df.select('order_purchase_timestamp','hour_of_day').show()

+------------------------+-----------+
|order_purchase_timestamp|hour_of_day|
+------------------------+-----------+
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
|     2017-07-26 17:38:47|         17|
+------------------------+-----------+
only showing top 20 rows


##Weekely vs Weekend Order

In [36]:
full_orders_df = full_orders_df.withColumn('order_day_type',when(expr('dayofWeek(order_purchase_timestamp) IN (1,7)'),lit('Weekend')).otherwise(lit('Weekday')))

In [36]:
full_orders_df.select('order_purchase_timestamp','order_day_type').show()

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
+------------------------+--------

In [37]:
full_orders_df.filter(col('order_day_type') == 'Weekday').select('order_purchase_timestamp','order_day_type').show()

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
|     2017-07-26 17:38:47|       Weekday|
+------------------------+--------

In [37]:
full_orders_df.write.mode('overwrite').parquet("/content/full_orders_dataset")