In [1]:
!hdfs dfs -ls /data/project/

Found 3 items
drwxr-xr-x   - pc   hadoop          0 2025-11-11 16:28 /data/project/archive
drwxr-xr-x   - root hadoop          0 2025-11-11 16:34 /data/project/cleaned_order_details.parquet
-rw-r--r--   2 pc   hadoop  284122744 2025-11-11 16:26 /data/project/orders.csv


**Setting up spark environment**

In [1]:
from pyspark.sql import SparkSession

spark=SparkSession.builder\
.appName("Olist data")\
.master("yarn")\
.getOrCreate()

25/11/14 09:28:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


**reading data files into respective spark data frames**

In [2]:
hdfs_path='/data/project/archive/'

In [3]:
customer_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+'olist_customers_dataset.csv')

                                                                                

In [4]:
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [5]:
customer_df.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [6]:
geolocation_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_geolocation_dataset.csv")

geolocation_df.printSchema()
geolocation_df.show()

                                                                                

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|       

In [7]:
order_items_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_order_items_dataset.csv")

order_items_df.printSchema()
order_items_df.show()

                                                                                

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 

In [8]:
order_payments_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_order_payments_dataset.csv")

order_payments_df.printSchema()
order_payments_df.show()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1|

In [9]:
order_reviews_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_order_reviews_dataset.csv")

order_reviews_df.printSchema()
order_reviews_df.show()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                NULL|                  NULL| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|                

In [10]:
orders_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_orders_dataset.csv")

orders_df.printSchema()
orders_df.show()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------

In [11]:
products_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_products_dataset.csv")

products_df.printSchema()
products_df.show()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+--------------

In [12]:
sellers_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"olist_sellers_dataset.csv")

sellers_df.printSchema()
sellers_df.show()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
|c240c4061717ac180...|                 20920|   rio de janeiro|          RJ|
|e49c26c3edfa46d22...|                 55325|           brejao|          PE|
|1b938a7ec6ac5061a...|                 16

In [13]:
product_translation_df=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load(hdfs_path+"product_category_name_translation.csv")

product_translation_df.printSchema()
product_translation_df.show()

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
|           automotivo|                         auto|
|      cama_mesa_banho|               bed_bath_table|
|     moveis_decoracao|              furniture_decor|
|        esporte_lazer|               sports_leisure|
|           perfumaria|                    perfumery|
| utilidades_domest...|                   housewares|
|            telefonia|                    telephony|
|   relogios_presentes|                watches_gifts|
|    alimentos_bebidas|                   food_drink|
|                bebes|                         baby|
|            papelaria|                   stationery|
| tablets_impres

In [14]:
%whos DataFrame


Variable                 Type         Data/Info
-----------------------------------------------
customer_df              DataFrame    DataFrame[customer_id: st<...>, customer_state: string]
geolocation_df           DataFrame    DataFrame[geolocation_zip<...>eolocation_state: string]
order_items_df           DataFrame    DataFrame[order_id: strin<...>e, freight_value: double]
order_payments_df        DataFrame    DataFrame[order_id: strin<...>t, payment_value: double]
order_reviews_df         DataFrame    DataFrame[review_id: stri<...>answer_timestamp: string]
orders_df                DataFrame    DataFrame[order_id: strin<...>delivery_date: timestamp]
product_translation_df   DataFrame    DataFrame[product_categor<...>ory_name_english: string]
products_df              DataFrame    DataFrame[product_id: str<...>t, product_width_cm: int]
sellers_df               DataFrame    DataFrame[seller_id: stri<...>ng, seller_state: string]


**validating correct data loading**

In [15]:
print(f"customer_df rows: {customer_df.count()}\
\ngeolocation_df rows: {geolocation_df.count()}\
\norder_items_df rows: {order_items_df.count()}\
\norder_payments_df rows: {order_payments_df.count()}\
\norder_reviews_df rows: {order_reviews_df.count()}\
\norders_df rows: {orders_df.count()}\
\nproduct_translation_df rows: {product_translation_df.count()}\
\nproducts_df rows: {products_df.count()}\
\nsellers_df rows: {sellers_df.count()}")

customer_df rows: 99441
geolocation_df rows: 1000163
order_items_df rows: 112650
order_payments_df rows: 103886
order_reviews_df rows: 104162
orders_df rows: 99441
product_translation_df rows: 71
products_df rows: 32951
sellers_df rows: 3095


**NULL/DUPLICATE values check**

In [16]:
from pyspark.sql.functions import *

def check_missing_values(df,df_name):
    print(f"missing values in {df_name} dataframe\n")
    df.select([sum(when(col(c).isNull(),1).otherwise(0)).alias(c) for c in df.columns]).show()

In [17]:
check_missing_values(customer_df,"customers")
check_missing_values(geolocation_df,"geolocation")
check_missing_values(order_items_df,"order items")
check_missing_values(order_payments_df,"order payments")
check_missing_values(order_reviews_df,"order reviews")
check_missing_values(orders_df,"orders")
check_missing_values(sellers_df,"sellers")
check_missing_values(products_df,"products")
check_missing_values(product_translation_df,"product translation")


missing values in customers dataframe

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+

missing values in geolocation dataframe



                                                                                

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+

missing values in order items dataframe

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+

missing values in order payments dataframe

**so products\_df, orders\_df, order\_reviews\_df have null values in their cols**

In [18]:
#handle missing values
order_reviews_df=order_reviews_df.fillna({'review_id':99999}) 
order_reviews_df=order_reviews_df.dropna(subset=['order_id']) 


In [19]:
check_missing_values(order_reviews_df,"order reviews")

missing values in order reviews dataframe

+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|        0|       0|         144|               89922|                 60843|                6528|                   6549|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



**now lets check for duplicates**

In [20]:
def check_duplicates(df,key,df_name):
    print(f"duplicates in {df_name} dataframe\n")
    df.groupBy(key).count().filter(col('count')>1).show()

In [21]:
check_duplicates(products_df,['product_id'],'products')
check_duplicates(order_items_df,['order_id','product_id','seller_id'],'order items')
check_duplicates(customer_df,['customer_id'],'customers')
check_duplicates(orders_df,['order_id'],'orders')
check_duplicates(order_payments_df,['order_id'],'order_payments')
check_duplicates(order_reviews_df,['order_id','review_id'],'order_reviews')
check_duplicates(geolocation_df,['geolocation_zip_code_prefix'],'geolocation')
check_duplicates(sellers_df,['seller_id'],'sellers')


duplicates in products dataframe



                                                                                

+----------+-----+
|product_id|count|
+----------+-----+
+----------+-----+

duplicates in order items dataframe



                                                                                

+--------------------+--------------------+--------------------+-----+
|            order_id|          product_id|           seller_id|count|
+--------------------+--------------------+--------------------+-----+
|2059a6cad3cfd50e2...|e48406fb377b0bf86...|9803a40e82e45418a...|    2|
|73cbd9a357e885c90...|a5f25345083fc9e53...|83b08de9698075722...|    3|
|96519c210f6dfe22f...|027cdd14a677a5834...|81f89e42267213cb9...|    2|
|095234196e1f3c4fe...|368c6c730842d7801...|1f50f920176fa81da...|    3|
|16febac56c216c1cc...|64ea282a67530101b...|4a3ca9315b744ce9f...|    4|
|2fb7dfe5fec51c8a9...|389d119b48cf3043d...|1f50f920176fa81da...|    2|
|66b9c991ee308f934...|19936fa4f614ee059...|b37c4c02bda3161a7...|    2|
|9883744c6400cfad8...|595fac2a385ac33a8...|ef0ace09169ac0905...|    2|
|2dd1b3fcb05d15852...|3609dcfca328f686e...|516e7738bd8f735ac...|    2|
|3fef122997c747784...|8b45810da2ef98604...|1900267e848ceeba8...|    2|
|468c8b5d4cee2be21...|584d0486add75aa0f...|e9d99831abad74458...|    6|
|35ebb

                                                                                

+--------------------+--------------------+-----+
|            order_id|           review_id|count|
+--------------------+--------------------+-----+
|infelizmente não ...|Era pra dar de pr...|    2|
| 2017-09-14 00:00:00|Fizemos uma notif...|    2|
| 2018-03-20 00:00:00|eu super recomend...|    2|
| atrás dela é tip...|Também a ponta da...|    2|
| 2016-10-27 00:00:00|           Obrigada"|    2|
| 2018-02-24 00:00:00|Não recebi o prod...|    2|
| 2018-05-23 00:00:00|                   "|    2|
| 2017-09-15 00:00:00|          Obrigada "|    2|
| 2017-11-22 00:00:00|A única coisa que...|    2|
| 2018-03-16 00:00:00|muito feliz pelo ...|    2|
| 2018-03-21 00:00:00|A resolução das i...|    2|
| 2018-08-18 00:00:00|quero meu dinheir...|    3|
| 2017-08-24 00:00:00|O amarelo não rec...|    2|
| 2018-08-02 00:00:00|Foram entregues l...|    2|
| 2018-03-03 00:00:00|Qto ao produto ai...|    2|
| 2016-10-29 00:00:00| Vendedor nota 1000"|    2|
| 2018-08-15 00:00:00|MANDAMOS E-MAIL F...|    2|


                                                                                

+---------------------------+-----+
|geolocation_zip_code_prefix|count|
+---------------------------+-----+
|                       2122|   33|
|                       2366|   33|
|                       3918|   50|
|                       4101|   72|
|                       9852|  107|
|                      13289|   61|
|                      26087|  111|
|                      28024|   95|
|                       3226|   24|
|                       4190|   52|
|                      18201|   69|
|                      20396|   11|
|                       1303|  166|
|                       6825|   22|
|                       8257|   60|
|                      13483|   83|
|                      24855|   41|
|                      25638|    2|
|                       2443|   80|
|                       2721|  149|
+---------------------------+-----+
only showing top 20 rows

duplicates in sellers dataframe

+---------+-----+
|seller_id|count|
+---------+-----+
+---------+-----+



**customer dist by state**

In [22]:
customer_df.groupBy('customer_state').count().orderBy(col('count').desc()).show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



**order status dist**

In [23]:
orders_df.groupBy('order_status').count().orderBy(col('count').desc()).show()

+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



**payments**

In [24]:
order_payments_df.show()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|
|771ee386b001f0620...|                 1| credit_card|                   1|        81.16|
|3d7239c394a212faa...|                 1| credit_card|                   3|        51.84|
|1f78449c8

In [25]:
order_payments_df.groupBy('payment_type').count().orderBy(col('count').desc()).show()

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



**order items**

In [26]:
top_products=order_items_df.groupBy('product_id').agg(sum('price').alias('total_sales')).orderBy(col('total_sales').desc())
top_products.show()

+--------------------+------------------+
|          product_id|       total_sales|
+--------------------+------------------+
|bb50f2e236e5eea01...|           63885.0|
|6cdd53843498f9289...| 54730.20000000005|
|d6160fb7873f18409...|48899.340000000004|
|d1c427060a0f73f6b...| 47214.51000000006|
|99a4788cb24856965...|43025.560000000085|
|3dd2a17168ec895c7...| 41082.60000000005|
|25c38557cf793876c...| 38907.32000000001|
|5f504b3a1c75b73d6...|37733.899999999994|
|53b36df67ebb7c415...| 37683.42000000001|
|aca2eb7d00ea1a7b8...| 37608.90000000007|
|e0d64dcfaa3b6db5c...|          31786.82|
|d285360f29ac7fd97...|31623.809999999983|
|7a10781637204d8d1...|           30467.5|
|f1c7f353075ce59d8...|          29997.36|
|f819f0c84a64f02d3...|29024.479999999996|
|588531f8ec37e7d5f...|28291.989999999998|
|422879e10f4668299...|26577.219999999972|
|16c4e87b98a9370a9...|           25034.0|
|5a848e4ab52fd5445...|24229.029999999962|
|a62e25e09e05e6faf...|           24051.0|
+--------------------+------------

**avg delivery time analysis**

In [27]:
orders_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [28]:
order_delivery_times=orders_df.select('order_id','order_purchase_timestamp','order_delivered_customer_date')\
.withColumn('delivery_time_taken',datediff(orders_df['order_delivered_customer_date'],orders_df['order_purchase_timestamp'])).orderBy(col('delivery_time_taken').desc())

order_delivery_times.show()

+--------------------+------------------------+-----------------------------+-------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_time_taken|
+--------------------+------------------------+-----------------------------+-------------------+
|ca07593549f1816d2...|     2017-02-21 23:31:27|          2017-09-19 14:36:39|                210|
|1b3190b2dfa9d789e...|     2018-02-23 14:57:35|          2018-09-19 23:24:07|                208|
|440d0d17af552815d...|     2017-03-07 23:59:51|          2017-09-19 15:12:50|                196|
|2fb597c2f772eca01...|     2017-03-08 18:09:02|          2017-09-19 14:33:17|                195|
|285ab9426d6982034...|     2017-03-08 22:47:40|          2017-09-19 14:00:04|                195|
|0f4519c5f1c541dde...|     2017-03-09 13:26:57|          2017-09-19 14:38:21|                194|
|47b40429ed8cce3ae...|     2018-01-03 09:44:01|          2018-07-13 20:51:31|                191|
|2fe324febf907e3ea..

**standardizing format**

In [29]:
def print_schema(df,df_name):
    print(f"schema of {df_name}")
    df.printSchema()

In [39]:
print_schema(orders_df,'orders')

schema of orders
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [40]:
print_schema(customer_df,'customers')

schema of customers
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [41]:
print_schema(order_payments_df,'orders payments')

schema of orders payments
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [42]:
orders_df_cleaned=orders_df.withColumn('order_purchase_timestamp',to_date('order_purchase_timestamp'))

In [43]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|              2017-10-02|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|              2018-07-24|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [44]:
orders_payments_cleaned=order_payments_df.withColumn('payment_type',when(col('payment_type')=='boleto','bank transfer').when(col('payment_type')=='debit_card','debit transfer')\
                                               .when(col('payment_type')=='credit_card','credit transfer').otherwise('other transfer'))
orders_payments_cleaned.show()

+--------------------+------------------+---------------+--------------------+-------------+
|            order_id|payment_sequential|   payment_type|payment_installments|payment_value|
+--------------------+------------------+---------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1|credit transfer|                   8|        99.33|
|a9810da82917af2d9...|                 1|credit transfer|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1|credit transfer|                   1|        65.71|
|ba78997921bbcdc13...|                 1|credit transfer|                   8|       107.78|
|42fdf880ba16b47b5...|                 1|credit transfer|                   2|       128.45|
|298fcdf1f73eb413e...|                 1|credit transfer|                   2|        96.12|
|771ee386b001f0620...|                 1|credit transfer|                   1|        81.16|
|3d7239c394a212faa...|                 1|credit transfer|             

In [45]:
customer_df_cleaned=customer_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast("string"))
customer_df_cleaned.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [46]:
customer_df_cleaned=customer_df_cleaned.dropDuplicates(['customer_id'])

In [47]:
customer_df_cleaned.show()



+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|00012a2ce6f8dcda2...|248ffe10d632bebe4...|                    6273|              osasco|            SP|
|000161a058600d590...|b0015e09bb4b6e47c...|                   35550|         itapecerica|            MG|
|000379cdec6255224...|0b83f73b19c2019e1...|                    4841|           sao paulo|            SP|
|0004164d20a9e969a...|104bdb7e6a6cdceaa...|                   13272|            valinhos|            SP|
|000419c5494106c30...|14843983d4a159080...|                   24220|             niteroi|            RJ|
|00050bf6e01e69d5c...|e3cf594a99e810f58...|                   98700|                ijui|            RS|
|00072d033fe2e5906...|b7c13491fd2aecd93...|            

                                                                                

In [30]:
order_with_totalValues=orders_details.groupBy('order_id').agg(sum(col('payment_value')).alias('total_value')).orderBy(col('total_value').desc())

NameError: name 'orders_details' is not defined

In [None]:
order_with_totalValues.show()

**removing outliers**

In [51]:
quantiles=order_items_df.approxQuantile("price",[0.01,0.99],0.0)
low_cutoff,high_cutoff=quantiles[0],quantiles[1]

                                                                                

In [52]:
print(low_cutoff)
print(high_cutoff)


9.99
890.0


In [53]:
order_items_df=order_items_df.filter((col('price')>low_cutoff) & (col('price')<high_cutoff))

In [54]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [55]:
products_df=products_df.withColumn('product_size_category',when(col('product_weight_g')<500,'small').when(col('product_weight_g').between(500,2000),'medium').otherwise('large'))

In [56]:
products_df.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

In [57]:
#revenue per seller

sellers_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [58]:
order_payments_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [59]:
order_items_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [60]:
revenue_per_seller=order_items_df.groupBy('seller_id').agg(sum(col('price')).alias('revenue_generated')).orderBy(col('revenue_generated').desc())

In [61]:
revenue_per_seller.show()

+--------------------+------------------+
|           seller_id| revenue_generated|
+--------------------+------------------+
|4869f7a5dfa277a7d...|226792.63999999908|
|4a3ca9315b744ce9f...| 200472.9199999981|
|7c67e1448b00f6e96...|187923.89000000118|
|fa1c13f2614d7b5c4...| 162793.4399999999|
|da8622b14eb17ae28...| 160196.9699999994|
|7a67c85e85bb2ce85...|141645.63000000076|
|1025f0e2d44d7041d...|138968.55000000005|
|955fee9216a65b617...|135171.70000000007|
|6560211a19b47992c...|123304.83000000002|
|7e93a43ef30c4f03f...|121237.23000000011|
|46dc3b2cc0980fb8e...|120611.24000000014|
|620c87c171fb2a6dd...|114774.50000000048|
|7d13fca1522535862...|113628.97000000006|
|1f50f920176fa81da...|106939.21000000078|
|cc419e0650a3c5ba7...|104053.41999999946|
|a1043bafd471dff53...|101901.15999999992|
|53243585a1d6dc264...| 95545.23000000005|
|3d871de0142ce09b7...| 94914.20000000022|
|5dceca129747e92ff...| 70817.41000000013|
|ccc4bbb5f32a6ab2b...| 67885.54000000007|
+--------------------+------------

In [65]:
orders_details.write.mode('overwrite') \
    .option('path', '/data/project/cleaned_order_details.parquet') \
    .saveAsTable('order_details')


ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
25/11/11 16:34:06 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [67]:
spark.sql('describe formatted order_details').show(truncate=False)

+-----------------------------+---------+-------+
|col_name                     |data_type|comment|
+-----------------------------+---------+-------+
|customer_id                  |string   |NULL   |
|order_id                     |string   |NULL   |
|order_status                 |string   |NULL   |
|order_purchase_timestamp     |date     |NULL   |
|order_approved_at            |timestamp|NULL   |
|order_delivered_carrier_date |timestamp|NULL   |
|order_delivered_customer_date|timestamp|NULL   |
|order_estimated_delivery_date|timestamp|NULL   |
|order_item_id                |int      |NULL   |
|product_id                   |string   |NULL   |
|seller_id                    |string   |NULL   |
|shipping_limit_date          |timestamp|NULL   |
|price                        |double   |NULL   |
|freight_value                |double   |NULL   |
|payment_sequential           |int      |NULL   |
|payment_type                 |string   |NULL   |
|payment_installments         |int      |NULL   |


In [68]:
spark.sql("DESCRIBE FORMATTED order_details").show(100, truncate=False)


+-----------------------------+----------------------------------------------------------------+-------+
|col_name                     |data_type                                                       |comment|
+-----------------------------+----------------------------------------------------------------+-------+
|customer_id                  |string                                                          |NULL   |
|order_id                     |string                                                          |NULL   |
|order_status                 |string                                                          |NULL   |
|order_purchase_timestamp     |date                                                            |NULL   |
|order_approved_at            |timestamp                                                       |NULL   |
|order_delivered_carrier_date |timestamp                                                       |NULL   |
|order_delivered_customer_date|timestamp               

In [15]:
orders_df.cache()
customer_df.cache()
order_items_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [22]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [30]:
from pyspark.sql.functions import *

order_details=orders_df.join(order_items_df,'order_id','inner')\
.join(customer_df,'customer_id','inner')\
.join(products_df,'product_id','inner')\
.join(sellers_df,'seller_id','inner')\
.join(geolocation_df,col('customer_zip_code_prefix')==col('geolocation_zip_code_prefix'),'left')\
.join(order_reviews_df,'order_id','left')\
.join(order_payments_df,'order_id','left')

In [31]:
order_details.show()

[Stage 67:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+--------------------+------------------------+-------------+--------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+---------------------------+-------------------+------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+
|            order_id|           seller_id|          product_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_

                                                                                

In [32]:
order_details.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true

In [41]:
order_details.groupBy('seller_id').agg(sum('price').alias('total_revenue')).orderBy(col('total_revenue').desc()).show()



+--------------------+--------------------+
|           seller_id|       total_revenue|
+--------------------+--------------------+
|4869f7a5dfa277a7d...| 3.613871731999314E7|
|53243585a1d6dc264...|3.4291592950000696E7|
|4a3ca9315b744ce9f...| 3.375957084001202E7|
|7c67e1448b00f6e96...|3.2282321790021457E7|
|fa1c13f2614d7b5c4...|3.0139386310006626E7|
|da8622b14eb17ae28...| 2.985766973003611E7|
|7e93a43ef30c4f03f...| 2.631570630000355E7|
|1025f0e2d44d7041d...|2.2937518520012792E7|
|46dc3b2cc0980fb8e...| 2.179177329001608E7|
|955fee9216a65b617...|2.0964410670017645E7|
|7a67c85e85bb2ce85...| 2.031279489002914E7|
|620c87c171fb2a6dd...| 2.011983960002505E7|
|7d13fca1522535862...|1.8156881910003982E7|
|a1043bafd471dff53...|1.7662675980010696E7|
|6560211a19b47992c...|1.7315932900000528E7|
|edb1ef5e36e0c8cd8...|1.6624835150007125E7|
|1f50f920176fa81da...|1.6497454440033637E7|
|5dceca129747e92ff...|1.4910548340003535E7|
|cc419e0650a3c5ba7...| 1.475146450003979E7|
|3d871de0142ce09b7...|1.41845253

                                                                                

In [46]:
order_details.select(['order_id']).filter(col('order_item_id')>2).show(truncate=False)

                                                                                

+--------------------------------+
|order_id                        |
+--------------------------------+
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
|00143d0f86d6fbd9f9b38ab440ac16f5|
+--------------------------------+
only showing top 20 rows



In [48]:
order_details.select('customer_id').filter(col('order_id')=='00143d0f86d6fbd9f9b38ab440ac16f5').show()

                                                                                

+--------------------+
|         customer_id|
+--------------------+
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
|2e45292979b9b2700...|
+--------------------+
only showing top 20 rows



In [51]:
#avg review score per seller

order_details.groupBy('seller_id').agg(avg(col('review_score')).alias('average_review')).orderBy(col('average_review').desc()).show()



+--------------------+--------------+
|           seller_id|average_review|
+--------------------+--------------+
|58c851d1a3c7cd3da...|           5.0|
|aa8af66c623d7d544...|           5.0|
|0f519b0d2e5eb2227...|           5.0|
|31e60bf8d103ce479...|           5.0|
|64c9a1db4e73e19aa...|           5.0|
|2b2fed75b8e5ea3a0...|           5.0|
|33ab10be054370c25...|           5.0|
|9d213f303afae4983...|           5.0|
|05a48cc8859962767...|           5.0|
|94d76e96eedd97625...|           5.0|
|e94b64dc6979b302a...|           5.0|
|4aba6a02a788d3ec8...|           5.0|
|fd312b6bf05efac6c...|           5.0|
|a61cc04793308395a...|           5.0|
|1a8e2d9c38b84a970...|           5.0|
|edb58a1390adf2738...|           5.0|
|c04d70d515d15a7e3...|           5.0|
|5f57db27027655e6c...|           5.0|
|c74f14c1e26cf1bd5...|           5.0|
|2c538755f1ca9540a...|           5.0|
+--------------------+--------------+
only showing top 20 rows



                                                                                

In [53]:
#top products sold by number
order_details.groupBy('product_id').agg(count('product_id').alias('number_sold')).orderBy(col('number_sold').desc()).show()



+--------------------+-----------+
|          product_id|number_sold|
+--------------------+-----------+
|aca2eb7d00ea1a7b8...|      86740|
|422879e10f4668299...|      81110|
|99a4788cb24856965...|      78775|
|389d119b48cf3043d...|      60248|
|d1c427060a0f73f6b...|      59274|
|368c6c730842d7801...|      58358|
|53759a2ecddad2bb8...|      52654|
|53b36df67ebb7c415...|      52105|
|154e7e31ebfa09220...|      42700|
|3dd2a17168ec895c7...|      40787|
|e53e557d5a159f5aa...|      39516|
|2b4609f8948be1887...|      36179|
|35afc973633aaeb6b...|      31206|
|e0d64dcfaa3b6db5c...|      31153|
|42a2c92a0979a949c...|      30486|
|7c1bd920dbdf22470...|      29018|
|a62e25e09e05e6faf...|      28898|
|5a848e4ab52fd5445...|      28737|
|c4baedd846ed09b85...|      28166|
|b532349fe46b38fbc...|      27176|
+--------------------+-----------+
only showing top 20 rows



                                                                                

In [55]:
#top customers by spending
order_details.groupBy('customer_id').agg(sum(col('price')).alias('amount_spent')).orderBy(col('amount_spent').desc()).show()



+--------------------+------------------+
|         customer_id|      amount_spent|
+--------------------+------------------+
|d3e82ccec3cb5f956...|         6662844.0|
|df55c14d1476a9a34...|         3565657.0|
|fe5113a38e3575c04...|         3293604.0|
|ec5b2ba62e5743423...|         2556120.0|
|63b964e79dee32a35...|         2501664.0|
|46bb3c0b1a65c8399...|         2336752.0|
|05455dfa7cd02f13d...| 2160194.400000087|
|3690e975641f01bd0...|         2124498.0|
|349509b216bd5ec11...|         1923627.0|
|695476b5848d64ba0...|1820543.1299999943|
|73236a0796f53d60d...|         1755520.0|
|cc803a2c412833101...|         1676400.0|
|1ff773612ab8934db...|1658641.7999999512|
|fced842c7dad61e8c...|         1654898.0|
|1ecb47d23dc8203cd...|1629588.3599999903|
|de832e8dbb1f588a4...|1584990.5999999817|
|803cd9b04f9cd252c...|         1512312.0|
|d72181923840c8895...|1488114.8999999566|
|06d478ba352a27a51...|         1461150.0|
|0049e8442c2a3e4a8...|         1444800.0|
+--------------------+------------

                                                                                

In [31]:
orders_df.cache()
order_items_df.cache()
customer_df.cache()

DataFrame[customer_id: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string]

In [32]:
geolocation_df=geolocation_df.dropDuplicates(['geolocation_zip_code_prefix'])

**ISSUE: We joined orders df with order items df, which was giving incorect results for stats like total orders per seller since order items df contains multiple rows with same order id 
but different order item id showing number of items purchased within each order. now we will make another col called total order value which will be calculated by summing up the price and freight value of each item.**

In [33]:
order_values=order_items_df.groupBy('order_id').agg(first(col('seller_id')).alias('seller_id'),first(col('product_id')).alias('product_id'),sum(col('price')).alias('total_price'),sum(col('freight_value')).alias('total_freight_value')).withColumn('total_value',round(col('total_price')+col('total_freight_value'),2))
order_values.show()



+--------------------+--------------------+--------------------+------------------+-------------------+-----------+
|            order_id|           seller_id|          product_id|       total_price|total_freight_value|total_value|
+--------------------+--------------------+--------------------+------------------+-------------------+-----------+
|000229ec398224ef6...|5b51032eddd242adc...|c777355d18b72b67a...|             199.0|              17.87|     216.87|
|00054e8431b9d7675...|7040e82f899a04d1b...|8d4f2bb7e93e6710a...|              19.9|              11.85|      31.75|
|000576fe39319847c...|5996cddab893a4652...|557d850972a7d6f79...|             810.0|              70.75|     880.75|
|0005a1a1728c9d785...|a416b6a846a117243...|310ae3c140ff94b03...|            145.95|              11.65|      157.6|
|0005f50442cb953dc...|ba143b05f0110f0dc...|4535b0e1091c278df...|             53.99|               11.4|      65.39|
|00061f2a7bc09da83...|cc419e0650a3c5ba7...|d63c1011f49d98b97...|        

                                                                                

In [56]:
from pyspark.sql.functions import *

order_details=orders_df.join(order_values,'order_id','inner')\
.join(customer_df,'customer_id','inner')\
.join(products_df,'product_id','inner')\
.join(broadcast(sellers_df),'seller_id','inner')\
.join(broadcast(geolocation_df),col('customer_zip_code_prefix')==col('geolocation_zip_code_prefix'),'left')\
.join(broadcast(order_reviews_df),'order_id','left')

In [50]:
order_payments_df.groupBy('order_id').count().orderBy(col('count').desc()).show(truncate=False)

+--------------------------------+-----+
|order_id                        |count|
+--------------------------------+-----+
|fa65dad1b0e818e3ccc5cb0e39231352|29   |
|ccf804e764ed5650cd8759557269dc13|26   |
|285c2e15bebd4ac83635ccc563dc71f4|22   |
|895ab968e7bb0d5659d16cd74cd1650c|21   |
|fedcd9f7ccdc8cba3a18defedd1a5547|19   |
|ee9ca989fc93ba09a6eddc250ce01742|19   |
|21577126c19bf11a0b91592e5844ba78|15   |
|4bfcba9e084f46c8e3cb49b0fa6e6159|15   |
|4689b1816de42507a7d63a4617383c59|14   |
|3c58bffb70dcf45f12bdf66a3c215905|14   |
|73df5d6adbeea12c8ae03df93f346e86|13   |
|cf101c3abd3c061ca9f78c1bbb1125af|13   |
|4fb76fa13b108a0d0478483421b0992c|13   |
|6d58638e32674bebee793a47ac4cbadc|12   |
|1d9a9731b9c10fc9cba74e6f74782e8b|12   |
|1a611328643ae11146ba09a4425d2e12|12   |
|d744783ed2ace06cac647a9e64dcbcfd|12   |
|465c2e1bee4561cb39e0db8c5993aafc|12   |
|c6492b842ac190db807c15aff21a7dd6|12   |
|67d83bd36ec2c7fb557742fb58837659|12   |
+--------------------------------+-----+
only showing top

In [48]:
order_values.count()

98666

In [54]:
order_payments_df.filter(col('order_id')=='fa65dad1b0e818e3ccc5cb0e39231352').show()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|fa65dad1b0e818e3c...|                27|     voucher|                   1|        66.02|
|fa65dad1b0e818e3c...|                 4|     voucher|                   1|        29.16|
|fa65dad1b0e818e3c...|                 1|     voucher|                   1|         3.71|
|fa65dad1b0e818e3c...|                 9|     voucher|                   1|         1.08|
|fa65dad1b0e818e3c...|                10|     voucher|                   1|        12.86|
|fa65dad1b0e818e3c...|                 2|     voucher|                   1|         8.51|
|fa65dad1b0e818e3c...|                25|     voucher|                   1|         3.68|
|fa65dad1b0e818e3c...|                 5|     voucher|                   1|         0.66|
|fa65dad1b

In [126]:
order_items_df.select('order_id').distinct().count()

98666

In [57]:
order_details.count()

                                                                                

98666

In [131]:
products_df.count()

32951

In [128]:
orders_df.count()

99441

In [19]:
geolocation_df.count()

1000163

In [133]:
sellers_df.select('seller_id').distinct().count()

3095

In [135]:
geolocation_df.count()

                                                                                

19015

In [136]:
geolocation_df.select('geolocation_zip_code_prefix').distinct().count()

                                                                                

19015

In [45]:
order_reviews_df.count()

                                                                                

99742

In [139]:
order_reviews_df.select('order_id').distinct().count()

                                                                                

99743

In [37]:
order_reviews_df.groupBy('order_id').count().orderBy(col('count')).filter(col('count')>=2).show(200,truncate=False)

+---------------------------------------------------------------+-----+
|order_id                                                       |count|
+---------------------------------------------------------------+-----+
|e9e7ebde60b33ca371a24f94a52fb499                               |2    |
|059bbeb3477ed66fd7e670c3f879009a                               |2    |
|169ab175fb915582d84c0c5c95bb0fe3                               |2    |
|23afc00db0b1c005196306f02e1c1178                               |2    |
|b6e5aa946acc4e29e7069510f28a0bce                               |2    |
|64bbe3a7dae1ea0dae5ac315fe779596                               |2    |
|e00ed9d20c3479f9f0e9727ca9d60946                               |2    |
|2017-04-04 00:00:00                                            |2    |
| ok                                                            |2    |
|38e31ec9d0ed250702ede5bb610d963b                               |2    |
|c7e1e46af92704b1c2e1924de96e7479                               

In [41]:
order_reviews_df.filter(col('order_id')=='6f118f6a730ce35b230974f295f743f9').show()

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|2b33838f229aecc9c...|6f118f6a730ce35b2...|           5|                NULL|                  NULL| 2018-02-08 00:00:00|    2018-02-09 09:18:42|
|f51c9e02992e07691...|6f118f6a730ce35b2...|           5|                NULL|                  NULL| 2018-02-15 00:00:00|    2018-02-19 13:15:14|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+



In [148]:
order_reviews_df=order_reviews_df.dropna(subset=['order_id'])

In [149]:
order_reviews_df.groupBy('order_id').count().orderBy(col('count').desc()).show(truncate=False)

+-------------------+-----+
|order_id           |count|
+-------------------+-----+
|2017-12-19 00:00:00|16   |
|2018-05-15 00:00:00|16   |
|2018-02-24 00:00:00|15   |
|2017-12-16 00:00:00|15   |
|2018-04-11 00:00:00|14   |
|2017-12-06 00:00:00|14   |
|2018-08-18 00:00:00|14   |
|2018-08-28 00:00:00|13   |
|2018-08-17 00:00:00|12   |
|2018-03-30 00:00:00|12   |
|2018-08-04 00:00:00|11   |
|2018-03-21 00:00:00|11   |
|2018-05-10 00:00:00|11   |
|2018-08-15 00:00:00|11   |
|2018-06-21 00:00:00|11   |
|2018-04-13 00:00:00|11   |
|2018-01-27 00:00:00|11   |
|2017-12-13 00:00:00|11   |
|2018-05-23 00:00:00|11   |
|2018-08-10 00:00:00|11   |
+-------------------+-----+
only showing top 20 rows



In [42]:
order_reviews_df=order_reviews_df.dropDuplicates(subset=['order_id'])

In [164]:
from pyspark.sql.functions import col, trim

orders_reviews_df = orders_reviews_df.withColumn('order_id', trim(col('order_id')))


In [165]:
orders_reviews_df = orders_reviews_df.filter(col('order_id').rlike('^[a-f0-9]{32}$'))


In [161]:
orders_reviews_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [166]:
order_reviews_df.groupBy('order_id').count().orderBy(col('count').desc()).show(truncate=False)

+-------------------+-----+
|order_id           |count|
+-------------------+-----+
|2017-12-19 00:00:00|16   |
|2018-05-15 00:00:00|16   |
|2018-02-24 00:00:00|15   |
|2017-12-16 00:00:00|15   |
|2018-04-11 00:00:00|14   |
|2017-12-06 00:00:00|14   |
|2018-08-18 00:00:00|14   |
|2018-08-28 00:00:00|13   |
|2018-08-17 00:00:00|12   |
|2018-03-30 00:00:00|12   |
|2018-08-04 00:00:00|11   |
|2018-03-21 00:00:00|11   |
|2018-05-10 00:00:00|11   |
|2018-08-15 00:00:00|11   |
|2018-06-21 00:00:00|11   |
|2018-04-13 00:00:00|11   |
|2018-01-27 00:00:00|11   |
|2017-12-13 00:00:00|11   |
|2018-05-23 00:00:00|11   |
|2018-08-10 00:00:00|11   |
+-------------------+-----+
only showing top 20 rows



In [21]:
order_payments_df.count()

103886

In [22]:
order_reviews_df.count()

104162

In [24]:
from pyspark.sql.functions import *
customer_df.join(geolocation_df,col('customer_zip_code_prefix')==col('geolocation_zip_code_prefix'),'left').count()

                                                                                

15083733

In [25]:
geolocation_df.groupBy("geolocation_zip_code_prefix").count().filter(col("count") > 1).show()




+---------------------------+-----+
|geolocation_zip_code_prefix|count|
+---------------------------+-----+
|                       2122|   33|
|                       2366|   33|
|                       3918|   50|
|                       4101|   72|
|                       9852|  107|
|                      13289|   61|
|                      26087|  111|
|                      28024|   95|
|                       3226|   24|
|                       4190|   52|
|                      18201|   69|
|                      20396|   11|
|                       1303|  166|
|                       6825|   22|
|                       8257|   60|
|                      13483|   83|
|                      24855|   41|
|                      25638|    2|
|                       2443|   80|
|                       2721|  149|
+---------------------------+-----+
only showing top 20 rows



                                                                                

In [90]:
from pyspark.sql.window import Window
window_spec=Window.partitionBy('seller_id').orderBy(desc('price'))

In [91]:
top_seller_products_df=order_details.withColumn('rank',rank().over(window_spec)).filter(col('rank')<=5)
top_seller_products_df.select('seller_id','price','rank').orderBy(col('seller_id').desc()).show(100)



+--------------------+------+----+
|           seller_id| price|rank|
+--------------------+------+----+
|ffff564a4f9085cd2...| 366.5|   1|
|ffff564a4f9085cd2...| 114.5|   2|
|ffff564a4f9085cd2...| 109.2|   3|
|ffff564a4f9085cd2...|103.95|   4|
|ffff564a4f9085cd2...| 82.95|   5|
|fffd5413c0700ac82...| 636.0|   1|
|fffd5413c0700ac82...| 298.6|   2|
|fffd5413c0700ac82...| 298.6|   2|
|fffd5413c0700ac82...| 298.6|   2|
|fffd5413c0700ac82...| 298.6|   2|
|fffd5413c0700ac82...| 298.6|   2|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|149.99|   1|
|ffeee66ac5d5a62fe...|149.99|   1|
|ffeee66ac5d5a62fe...|149.99|   1|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffeee66ac5d5a62fe...|129.99|   4|
|ffdd9f82b9a447f6f...| 214.0|   1|
|ffdd9f82b9a447f6f..

                                                                                

In [92]:
#total revenue and avg order value

customer_sp=order_details.groupBy('customer_id').agg(\
                                                    count('order_id').alias('total_orders'),\
                                                    sum('price').alias('total_spent'),
                                                    round(avg('price'),2).alias('AOV')).orderBy(col('total_spent').desc())
customer_sp.show()



+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|1617b1357756262bf...|           8|           13440.0| 1680.0|
|9af2372a1e4934027...|          29|11383.949999999997| 392.55|
|de832e8dbb1f588a4...|          15|10856.099999999999| 723.74|
|63b964e79dee32a35...|          24|            9888.0|  412.0|
|6f241d5bbb142b6f7...|           7| 9520.140000000001|1360.02|
|926b6a6fb8b6081e0...|           2|            7998.0| 3999.0|
|f959b7bc834045511...|           6|            7799.4| 1299.9|
|eb7a157e8da9c488c...|           2|            7798.0| 3899.0|
|ec5b2ba62e5743423...|           4|            7160.0| 1790.0|
|3118aefef04e5e97d...|           7|            6909.0|  987.0|
|c6e2731c5b391845f...|           1|            6735.0| 6735.0|
|f48d464a0baaea338...|           1|            6729.0| 6729.0|
|3fd6777bbce08a352...|           1|            6499.0| 

                                                                                

In [39]:
#seller performance metrics (revenue,average review,order count)

seller_sp=order_details.groupBy('seller_id').agg(\
                                                 count('order_id').alias('total_orders'),\
                                                 sum('price').alias('total_revenue'),\
                                                 round(avg('review_score'),2).alias('avg_review_score'),\
                                                 round(stddev('price'),2).alias('price variability'))\
                                                 .orderBy(col('total_revenue').desc())
                                                 
seller_sp.show()



+--------------------+------------+--------------------+----------------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review_score|price variability|
+--------------------+------------+--------------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.613871731999314E7|            4.09|           111.65|
|53243585a1d6dc264...|       54514|3.4291592950000696E7|            4.12|           499.65|
|4a3ca9315b744ce9f...|      330661| 3.375957084001202E7|            3.77|            59.37|
|7c67e1448b00f6e96...|      233306|3.2282321790021457E7|            3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686|3.0139386310006626E7|            4.38|            307.7|
|da8622b14eb17ae28...|      264433| 2.985766973003611E7|            3.98|            72.92|
|7e93a43ef30c4f03f...|       50226| 2.631570630000355E7|            4.15|           377.24|
|1025f0e2d44d7041d...|      229587|2.2937518520012792E7|            3.89|       

                                                                                

In [40]:
#product popularity matrix

product_metrics=order_details.groupBy('product_id').agg(\
                                                        count('order_id').alias('total_sales'),\
                                                        sum('price').alias('total_revenue'),\
                                                        round(avg('price'),2).alias('avg_price'),\
                                                        round(stddev('price'),2).alias('price_volatility'),\
                                                        collect_list('seller_id').alias('unique_sellers')).\
orderBy(col('total_sales').desc())
product_metrics.show()
                                                              

                                                                                

+--------------------+-----------+------------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740| 6164630.299996104|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.509997985|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.709996391|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248| 3280533.129998789|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.330003063|   138.68|           16.58|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358|3181698.8999990216|    54.52|            4.59|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654|2893017.4999995097|    54.94|            4.52|[1

In [None]:
#monthly revenue and order count trend

total orders
total revenue
avg order value 
min order value
max order value

In [59]:
#customer retention analysis -> first and last order

customer_re=order_details.groupBy('customer_id').agg(\
                                                     min('order_purchase_timestamp').alias('first_order_date'),\
                                                     max('order_purchase_timestamp').alias('last_order_date'),\
                                                     count('order_id').alias('total_orders_placed'),\
                                                     round(avg('price'),2).alias('avg_order_value')).\
orderBy(col('total_orders_placed').desc())
customer_re.show(truncate=False)
                                                     
            



+--------------------------------+-------------------+-------------------+-------------------+---------------+
|customer_id                     |first_order_date   |last_order_date    |total_orders_placed|avg_order_value|
+--------------------------------+-------------------+-------------------+-------------------+---------------+
|351e40989da90e70487765f6ea15d54b|2017-07-13 10:42:37|2017-07-13 10:42:37|11427              |85.99          |
|50920f8cd0681fd86ebe93670c8fe52e|2018-01-27 11:28:32|2018-01-27 11:28:32|10752              |43.82          |
|9b43e2a62de9bab3ad220e858e95ccbc|2017-05-25 22:27:50|2017-05-25 22:27:50|8556               |26.4           |
|270c23a11d024a44c896d1894b261a83|2017-08-08 20:26:31|2017-08-08 20:26:31|8001               |36.59          |
|5c87184371002d49e08b9ac10eb96647|2018-01-05 19:15:37|2018-01-05 19:15:37|6876               |12.49          |
|d3e82ccec3cb5f956a38d96c057ceaae|2017-03-18 14:28:34|2017-03-18 14:28:34|6876               |969.0          |
|

                                                                                

In [49]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [63]:
order_details.filter(col('customer_id') == 'f1e46939e6408b3e6fd46579a37c88b4').select('order_purchase_timestamp').show()


                                                                                

+------------------------+
|order_purchase_timestamp|
+------------------------+
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
|     2017-06-01 14:58:11|
+------------------------+
only showing top 20 rows



In [69]:
from pyspark.sql.functions import col

# assuming customer_re has first_order_date and last_order_date columns
customer_re.filter(col('first_order_date') != col('last_order_date')).show()




+-----------+----------------+---------------+-------------------+---------------+
|customer_id|first_order_date|last_order_date|total_orders_placed|avg_order_value|
+-----------+----------------+---------------+-------------------+---------------+
+-----------+----------------+---------------+-------------------+---------------+



                                                                                

In [74]:
orders_df.groupBy('customer_id').count().orderBy(col('count').desc()).show()

+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|f1e46939e6408b3e6...|    1|
|3391c4bc11a817e79...|    1|
|90d7075599361b694...|    1|
|5a58afc695ee03b9b...|    1|
|a340ce6c3570e68d4...|    1|
|9687241c8ed401845...|    1|
|b76530b7e66b27cd6...|    1|
|a3537e15f3e4f88e0...|    1|
|4d91a0aeb419c5f26...|    1|
|2774b6023c768c91e...|    1|
|2aec499f94f5e8278...|    1|
|5a6ae5b68fd27c687...|    1|
|791fffae1e2c66693...|    1|
|2b139a6842a26c357...|    1|
|0721e1c4b91bc6ded...|    1|
|9601347c41eb3f6dd...|    1|
|d480546bdc6b03fca...|    1|
|be170bc588de8c9e1...|    1|
|6f8b4eeaba59ef3fa...|    1|
|18dff7c2afb1ce5fb...|    1|
+--------------------+-----+
only showing top 20 rows



In [75]:
customer_df.count()

99441

In [76]:
orders_df.count()

99441

In [80]:
orders_df.join(customer_df,'customer_id','inner').count()

99441

In [87]:
geolocation_df.filter(col('geolocation_zip_code_prefix')=='2122').show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       2122|-23.513172191788303| -46.58263962184772|       sao paulo|               SP|
|                       2122|-23.512200548156255| -46.58175211601936|       sao paulo|               SP|
|                       2122|-23.511529487650744| -46.58759916694558|       sao paulo|               SP|
|                       2122|-23.512050000000002|-46.584090539961934|       sao paulo|               SP|
|                       2122|-23.513043970309834|-46.584710465173956|       são paulo|               SP|
|                       2122|-23.511865861503836| -46.58364351012754|       sao paulo|               SP|
|                       2122|         -23.512572| -46.5

In [93]:
geolocation_df.count()

                                                                                

19015

In [95]:
order_details.groupBy('customer_id').count().orderBy(col('count').desc()).show()



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|270c23a11d024a44c...|   63|
|13aa59158da63ba0e...|   38|
|9af2372a1e4934027...|   29|
|92cd3ec6e2d643d4e...|   26|
|2ba91e12e5e4c9f56...|   24|
|63b964e79dee32a35...|   24|
|6ee2f17e3b6c33d6a...|   24|
|86cc80fef09f7f39d...|   24|
|d22f25a9fadfb1abb...|   24|
|be1c4e52bb71e0c54...|   22|
|b246eeed30b362c09...|   22|
|fc3d1daec319d62d4...|   21|
|5e0f7317756669ff7...|   21|
|50920f8cd0681fd86...|   21|
|23924b2105c5376cd...|   20|
|be1b70680b9f9694d...|   20|
|bd5d39761aa56689a...|   20|
|eed931d3a5222a9a5...|   19|
|1d48a5dcde374b9d6...|   16|
|1d4411212e90d6a24...|   16|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [99]:
order_items_df.groupBy('order_id').count().orderBy(col('count').desc()).show(truncate=False)

+--------------------------------+-----+
|order_id                        |count|
+--------------------------------+-----+
|8272b63d03f5f79c56e9e4120aec44ef|21   |
|1b15974a0141d54e36626dca3fdc731a|20   |
|ab14fdcfbe524636d65ee38360e22ce8|20   |
|9ef13efd6949e4573a18964dd1bbe7f5|15   |
|428a2f660dc84138d969ccd69a0ab6d5|15   |
|9bdc4d4c71aa1de4606060929dee888c|14   |
|73c8ab38f07dc94389065f7eba4f297a|14   |
|37ee401157a3a0b28c9c6d0ed8c3b24b|13   |
|af822dacd6f5cff7376413c03a388bb7|12   |
|2c2a19b5703863c908512d135aa6accc|12   |
|3a213fcdfe7d98be74ea0dc05a8b31ae|12   |
|c05d6a79e55da72ca780ce90364abed9|12   |
|637617b3ffe9e2f7a2411243829226d0|12   |
|71dab1155600756af6de79de92e712e3|11   |
|5a3b1c29a49756e75f1ef513383c0c12|11   |
|7f2c22c54cbae55091a09a9653fd2b8a|11   |
|6c355e2913545fa6f72c40cbca57729e|11   |
|9aec4e1ae90b23c7bf2d2b3bfafbd943|10   |
|9f5054bd9a3c71702aa0917a7da29193|10   |
|30bdf3d824d824610a49887486debcaf|10   |
+--------------------------------+-----+
only showing top

In [103]:
order_items_df.filter(col('order_id')=='8272b63d03f5f79c56e9e4120aec44ef').show()

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|8272b63d03f5f79c5...|            1|270516a3f41dc035a...|2709af9587499e95e...|2017-07-21 18:25:23|  1.2|         7.89|
|8272b63d03f5f79c5...|            2|05b515fdc76e888aa...|2709af9587499e95e...|2017-07-21 18:25:23|  1.2|         7.89|
|8272b63d03f5f79c5...|            3|05b515fdc76e888aa...|2709af9587499e95e...|2017-07-21 18:25:23|  1.2|         7.89|
|8272b63d03f5f79c5...|            4|05b515fdc76e888aa...|2709af9587499e95e...|2017-07-21 18:25:23|  1.2|         7.89|
|8272b63d03f5f79c5...|            5|05b515fdc76e888aa...|2709af9587499e95e...|2017-07-21 18:25:23|  1.2|         7.89|
|8272b63d03f5f79c5...|            6|05b515fdc76e

+--------------------+--------------------+------------------+-------------------+
|            order_id|           seller_id|       total_price|total_freight_value|
+--------------------+--------------------+------------------+-------------------+
|000229ec398224ef6...|5b51032eddd242adc...|             199.0|              17.87|
|00054e8431b9d7675...|7040e82f899a04d1b...|              19.9|              11.85|
|000576fe39319847c...|5996cddab893a4652...|             810.0|              70.75|
|0005a1a1728c9d785...|a416b6a846a117243...|            145.95|              11.65|
|0005f50442cb953dc...|ba143b05f0110f0dc...|             53.99|               11.4|
|00061f2a7bc09da83...|cc419e0650a3c5ba7...|             59.99|               8.88|
|0006ec9db01a64e59...|4a3ca9315b744ce9f...|              74.0|              23.32|
|0009792311464db53...|530ec6109d11eaaf8...|              99.9|              27.65|
|0009c9a17f916a706...|fcb5ace8bcc92f757...|             639.0|              11.34|
|000

**extended enrichment**

In [58]:
order_details.select('order_status').show()

+------------+
|order_status|
+------------+
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
|   delivered|
+------------+
only showing top 20 rows



In [66]:
order_details=order_details.withColumn('is_delivered',when(col('order_status')=='delivered',lit(1)).otherwise(lit(0)))\
                                       .withColumn('is_cancelled',when(col('order_status')=='canceled',lit(1)).otherwise(lit(0)))

In [67]:
order_details.filter(col('order_status')=='canceled').select('order_status','is_delivered','is_cancelled').show()

+------------+------------+------------+
|order_status|is_delivered|is_cancelled|
+------------+------------+------------+
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
|    canceled|           0|           1|
+------------+------------+------------+
only showing top

In [68]:
order_details.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- total_price: double (nullable = true)
 |-- total_freight_value: double (nullable = true)
 |-- total_value: double (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (n

In [69]:
#customer segmentation based on spending

customer_spending=order_details.groupBy('customer_id').agg(sum(col('total_value')).alias('total_spending')).orderBy(col('total_spending').desc())
customer_spending.show()



+--------------------+--------------+
|         customer_id|total_spending|
+--------------------+--------------+
|1617b1357756262bf...|      13664.08|
|ec5b2ba62e5743423...|       7274.88|
|c6e2731c5b391845f...|       6929.31|
|f48d464a0baaea338...|       6922.21|
|3fd6777bbce08a352...|       6726.66|
|05455dfa7cd02f13d...|       6081.54|
|df55c14d1476a9a34...|       4950.34|
|e0a2412720e9ea4f2...|       4809.44|
|24bbf5fd2f2e1b359...|       4764.34|
|3d979689f636322c6...|       4681.78|
|1afc82cd60e303ef0...|       4513.32|
|cc803a2c412833101...|        4445.5|
|926b6a6fb8b6081e0...|       4194.76|
|35a413c7ca3c69756...|       4175.26|
|e9b0d0eb3015ef1c9...|       4163.51|
|3be2c536886b2ea46...|       4042.74|
|eb7a157e8da9c488c...|       4034.44|
|c6695e3b1e48680db...|       4016.91|
|31e83c01fce824d0f...|       3979.55|
|addc91fdf9c2b3045...|        3826.8|
+--------------------+--------------+
only showing top 20 rows



                                                                                

In [70]:
customer_spending=customer_spending.withColumn('segment',when(col('total_spending')>=1200,'High value').when((col('total_spending')>=500) & (col('total_spending')<1200),'medium value').otherwise('low value'))
customer_spending.show()
                                        

[Stage 241:>                                                        (0 + 2) / 2]

+--------------------+--------------+----------+
|         customer_id|total_spending|   segment|
+--------------------+--------------+----------+
|1617b1357756262bf...|      13664.08|High value|
|ec5b2ba62e5743423...|       7274.88|High value|
|c6e2731c5b391845f...|       6929.31|High value|
|f48d464a0baaea338...|       6922.21|High value|
|3fd6777bbce08a352...|       6726.66|High value|
|05455dfa7cd02f13d...|       6081.54|High value|
|df55c14d1476a9a34...|       4950.34|High value|
|e0a2412720e9ea4f2...|       4809.44|High value|
|24bbf5fd2f2e1b359...|       4764.34|High value|
|3d979689f636322c6...|       4681.78|High value|
|1afc82cd60e303ef0...|       4513.32|High value|
|cc803a2c412833101...|        4445.5|High value|
|926b6a6fb8b6081e0...|       4194.76|High value|
|35a413c7ca3c69756...|       4175.26|High value|
|e9b0d0eb3015ef1c9...|       4163.51|High value|
|3be2c536886b2ea46...|       4042.74|High value|
|eb7a157e8da9c488c...|       4034.44|High value|
|c6695e3b1e48680db..

                                                                                

In [71]:
order_details=order_details.join(customer_spending,'customer_id','left')
order_details.show()

25/11/14 09:55:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-----------+-------------------+-----------+--------------------+------------------------+--------------------+--------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+--------------------+------------+---------------------------+-------------------+-------------------+--------------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------+------------+--------------+---------+
|         customer_id|            order_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_approved_at|or

In [72]:
#hourly order distribution

order_details=order_details.withColumn('hour_of_day',hour(col('order_purchase_timestamp')))
order_details.count()

98666

In [74]:
order_details.select('hour_of_day').show()

+-----------+
|hour_of_day|
+-----------+
|         14|
|         11|
|         12|
|         18|
|         13|
|         22|
|         17|
|         20|
|          9|
|         20|
|         10|
|         11|
|         18|
|         10|
|         17|
|          0|
|         21|
|         21|
|         18|
|         15|
+-----------+
only showing top 20 rows



In [75]:
order_details=order_details.withColumn('isWeekend',when(dayofweek(col('order_purchase_timestamp'))==1,1).when(dayofweek(col('order_purchase_timestamp'))==7,1).otherwise(0))

In [76]:
order_details.select('isWeekend','order_purchase_timestamp').show()

+---------+------------------------+
|isWeekend|order_purchase_timestamp|
+---------+------------------------+
|        1|     2018-01-14 14:33:31|
|        1|     2017-12-10 11:53:48|
|        0|     2018-07-04 12:08:27|
|        0|     2018-03-19 18:40:33|
|        0|     2018-07-02 13:59:39|
|        1|     2018-03-24 22:16:10|
|        0|     2018-07-24 17:04:17|
|        0|     2018-08-14 20:43:09|
|        0|     2018-04-25 09:10:41|
|        0|     2018-05-11 20:33:38|
|        1|     2017-08-12 10:08:57|
|        0|     2018-02-22 11:54:42|
|        0|     2017-11-21 18:54:23|
|        0|     2018-03-07 10:33:13|
|        0|     2017-09-11 17:39:33|
|        1|     2017-08-06 00:42:49|
|        0|     2018-01-16 21:43:23|
|        0|     2018-06-11 21:17:00|
|        1|     2018-04-07 18:51:22|
|        0|     2017-10-16 15:29:43|
+---------+------------------------+
only showing top 20 rows



In [78]:
order_details.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- total_price: double (nullable = true)
 |-- total_freight_value: double (nullable = true)
 |-- total_value: double (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (n

In [81]:
#freight category


order_details=order_details.withColumn('freight_category',when(col('total_freight_value')>=30,'high_value').when((col('total_freight_value')>=20) & (col('total_freight_value')<30),'medium_value').otherwise('low_value'))
order_details.select('total_freight_value','freight_category').show()

+-------------------+----------------+
|total_freight_value|freight_category|
+-------------------+----------------+
|              17.87|       low_value|
|              11.85|       low_value|
|              70.75|      high_value|
|              11.65|       low_value|
|               11.4|       low_value|
|               8.88|       low_value|
|              23.32|    medium_value|
|              27.65|    medium_value|
|              11.34|       low_value|
|               8.77|       low_value|
|              13.71|       low_value|
|              16.11|       low_value|
|              11.85|       low_value|
|               44.4|      high_value|
|               16.6|       low_value|
|              16.98|       low_value|
|              26.33|    medium_value|
|               7.94|       low_value|
|              23.35|    medium_value|
|               45.3|      high_value|
+-------------------+----------------+
only showing top 20 rows



In [82]:
orders_by_state=order_details.groupBy(col('customer_state')).count().orderBy(col('count').desc())
orders_by_state.show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41375|
|            RJ|12762|
|            MG|11544|
|            RS| 5432|
|            PR| 4998|
|            SC| 3612|
|            BA| 3358|
|            DF| 2125|
|            ES| 2025|
|            GO| 2007|
|            PE| 1648|
|            CE| 1327|
|            PA|  970|
|            MT|  903|
|            MA|  740|
|            MS|  709|
|            PB|  532|
|            PI|  493|
|            RN|  482|
|            AL|  411|
+--------------+-----+
only showing top 20 rows



In [83]:
order_details.write.mode('overwrite').parquet('/data/project/archive/final_data_processed.parquet')

                                                                                

In [85]:
!hdfs dfs -ls /data/project/archive/

Found 10 items
drwxr-xr-x   - root hadoop          0 2025-11-14 10:34 /data/project/archive/final_data_processed.parquet
-rw-r--r--   2 pc   hadoop    9033957 2025-11-11 16:28 /data/project/archive/olist_customers_dataset.csv
-rw-r--r--   2 pc   hadoop   61273883 2025-11-11 16:28 /data/project/archive/olist_geolocation_dataset.csv
-rw-r--r--   2 pc   hadoop   15438671 2025-11-11 16:28 /data/project/archive/olist_order_items_dataset.csv
-rw-r--r--   2 pc   hadoop    5777138 2025-11-11 16:28 /data/project/archive/olist_order_payments_dataset.csv
-rw-r--r--   2 pc   hadoop   14451670 2025-11-11 16:28 /data/project/archive/olist_order_reviews_dataset.csv
-rw-r--r--   2 pc   hadoop   17654914 2025-11-11 16:28 /data/project/archive/olist_orders_dataset.csv
-rw-r--r--   2 pc   hadoop    2379446 2025-11-11 16:28 /data/project/archive/olist_products_dataset.csv
-rw-r--r--   2 pc   hadoop     174703 2025-11-11 16:28 /data/project/archive/olist_sellers_dataset.csv
-rw-r--r--   2 pc   hadoop      

In [87]:
!hdfs dfs -ls /data/project/archive/final_data_processed.parquet


Found 3 items
-rw-r--r--   2 root hadoop          0 2025-11-14 10:34 /data/project/archive/final_data_processed.parquet/_SUCCESS
-rw-r--r--   2 root hadoop   13083238 2025-11-14 10:34 /data/project/archive/final_data_processed.parquet/part-00000-032c4a00-8803-492e-a972-ed25dcaa820b-c000.snappy.parquet
-rw-r--r--   2 root hadoop   13102203 2025-11-14 10:34 /data/project/archive/final_data_processed.parquet/part-00001-032c4a00-8803-492e-a972-ed25dcaa820b-c000.snappy.parquet


In [88]:
!hdfs dfs -mkdir /data/processed

In [90]:
order_details.write.mode('overwrite').parquet('/data/processed/final_data_processed.parquet')

                                                                                

In [91]:
order_details.write.mode('overwrite').saveAsTable('full_order_detail')

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
25/11/14 11:04:28 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [93]:
spark.sql('show tables').show()

+---------+-----------------+-----------+
|namespace|        tableName|isTemporary|
+---------+-----------------+-----------+
|  default|     customers300|      false|
|  default|  customers_table|      false|
|  default|full_order_detail|      false|
|  default|    order_details|      false|
+---------+-----------------+-----------+

