# Olist E-Commerce Data

## **Setting up Spark Environment**

1\. Deploy a spark cluster (Dataproc, EMR, HDInsight, On-Premis)

2\. Download the data from Kaggle website [Link](https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce)

3\. Bash command to direct download to HDFS 
- Make sure to create a proper directory to put downloaded and to extract files into
- I created olist folder to download file
```
curl -L -o ~/olist/brazilian-ecommerce.zip\
  https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce
```
- I created data folder inside olist to extract files

```
unzip brazilian-ecommerce.zip -d ~/olist/data/
```

4\. Store Data in HDFS worker nodes
- I create folder to distribute files from hadoop master node to worker nodes
```
!hadoop fs -put ~/olist/data/*.csv /data/olist/
```
    
5\. Use Pyspark to intract with Data



# Data Ingestion

## **Downloaded and extracted data in hadoop master cluster**

In [21]:
ls /home/sekar_dhana8644/olist/

brazilian-ecommerce.zip  [0m[01;34mdata[0m/


In [20]:
ls /home/sekar_dhana8644/olist/data/

olist_customers_dataset.csv       olist_orders_dataset.csv
olist_geolocation_dataset.csv     olist_products_dataset.csv
olist_order_items_dataset.csv     olist_sellers_dataset.csv
olist_order_payments_dataset.csv  product_category_name_translation.csv
olist_order_reviews_dataset.csv


## **Moved data from Hadoop master cluster to worker nodes**

In [19]:
!hadoop fs -ls -h /data/olist/

Found 9 items
-rw-r--r--   2 sekar_dhana8644 hadoop      8.6 M 2025-08-30 19:17 /data/olist/olist_customers_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop     58.4 M 2025-08-30 19:17 /data/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop     14.7 M 2025-08-30 19:17 /data/olist/olist_order_items_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop      5.5 M 2025-08-30 19:17 /data/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop     13.8 M 2025-08-30 19:17 /data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop     16.8 M 2025-08-30 19:17 /data/olist/olist_orders_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop      2.3 M 2025-08-30 19:17 /data/olist/olist_products_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop    170.6 K 2025-08-30 19:17 /data/olist/olist_sellers_dataset.csv
-rw-r--r--   2 sekar_dhana8644 hadoop      2.6 K 2025-08-30 19:17 /data/olist/product_category_name_translation.csv


# Data Exploration

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('OlistDataset').getOrCreate()

spark

25/09/04 18:10:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [3]:
customer_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True, inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True, inferSchema=True)
order_items_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True, inferSchema=True)
order_payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True, inferSchema=True)
order_reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True, inferSchema=True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True, inferSchema=True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True, inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True, inferSchema=True)
translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True, inferSchema=True)

                                                                                

In [5]:
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [6]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [7]:
# Data Leakage or Drop

print(f'Customers df: {customer_df.count()} rows')
print(f'Orders df: {orders_df.count()} rows')
print(f'Order Items df: {order_items_df.count()} rows')
print(f'Order Payments df: {order_payments_df.count()} rows')
print(f'Order Reviews df: {order_reviews_df.count()} rows')
print(f'Products df: {products_df.count()} rows')
print(f'Sellers df: {sellers_df.count()} rows')
print(f'Product Category Name Translation df: {translation_df.count()} rows')

Customers df: 99441 rows
Orders df: 99441 rows
Order Items df: 112650 rows
Order Payments df: 103886 rows
Order Reviews df: 104162 rows
Products df: 32951 rows
Sellers df: 3095 rows
Product Category Name Translation df: 71 rows


In [4]:
# Null Values

from pyspark.sql.functions import *

customer_df.select([count(when(col(c).isNull(),1)).alias(c) for c in customer_df.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [9]:
# Duplicate Values

customer_df.groupBy('customer_id').count().filter('count>1').show()

[Stage 46:>                                                         (0 + 2) / 2]

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



                                                                                

In [10]:
# Customer Distribution by State

customer_df.groupBy('customer_state').count().orderBy(col('count').desc()).show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



In [11]:
# Customer Distribution by City

customer_df.groupBy('customer_city').count().orderBy('count', ascending=False).show()

+--------------------+-----+
|       customer_city|count|
+--------------------+-----+
|           sao paulo|15540|
|      rio de janeiro| 6882|
|      belo horizonte| 2773|
|            brasilia| 2131|
|            curitiba| 1521|
|            campinas| 1444|
|        porto alegre| 1379|
|            salvador| 1245|
|           guarulhos| 1189|
|sao bernardo do c...|  938|
|             niteroi|  849|
|         santo andre|  797|
|              osasco|  746|
|              santos|  713|
|             goiania|  692|
| sao jose dos campos|  691|
|           fortaleza|  654|
|            sorocaba|  633|
|              recife|  613|
|       florianopolis|  570|
+--------------------+-----+
only showing top 20 rows



In [12]:
orders_df.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

In [13]:
# Order Distribution by Order Status

orders_df.groupBy('order_status').count().orderBy('count', ascending=False).show()

+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



In [14]:
delivery_df = orders_df.select('order_id', 'order_purchase_timestamp', 'order_delivered_customer_date')
delivery_df.show(5)

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|
+--------------------+------------------------+-----------------------------+
only showing top 5 rows



In [15]:
delivery_time = delivery_df.withColumn('delivery_time', datediff(col('order_delivered_customer_date'), col('order_purchase_timestamp')))
delivery_time.show(5)

+--------------------+------------------------+-----------------------------+-------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_time|
+--------------------+------------------------+-----------------------------+-------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|            8|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|           14|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|            9|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|           14|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|            3|
+--------------------+------------------------+-----------------------------+-------------+
only showing top 5 rows



In [16]:
# Average Order Delivery Time

delivery_time.orderBy('delivery_time', ascending=False).show(5)

+--------------------+------------------------+-----------------------------+-------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_time|
+--------------------+------------------------+-----------------------------+-------------+
|ca07593549f1816d2...|     2017-02-21 23:31:27|          2017-09-19 14:36:39|          210|
|1b3190b2dfa9d789e...|     2018-02-23 14:57:35|          2018-09-19 23:24:07|          208|
|440d0d17af552815d...|     2017-03-07 23:59:51|          2017-09-19 15:12:50|          196|
|2fb597c2f772eca01...|     2017-03-08 18:09:02|          2017-09-19 14:33:17|          195|
|285ab9426d6982034...|     2017-03-08 22:47:40|          2017-09-19 14:00:04|          195|
+--------------------+------------------------+-----------------------------+-------------+
only showing top 5 rows



In [17]:
order_payments_df.columns

['order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']

In [18]:
# Payment Distribution by Payment Method

order_payments_df.groupBy('payment_type').count().orderBy('count', ascending=False).show()

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



In [19]:
order_items_df.columns

['order_id',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value']

In [20]:
# Top products by sales amount

top_product = order_items_df.groupBy('product_id').agg(sum('price').alias('total_sales'), count('product_id').alias('count'))
top_product.orderBy('total_sales', ascending=False).show()

+--------------------+------------------+-----+
|          product_id|       total_sales|count|
+--------------------+------------------+-----+
|bb50f2e236e5eea01...|           63885.0|  195|
|6cdd53843498f9289...| 54730.20000000005|  156|
|d6160fb7873f18409...|48899.340000000004|   35|
|d1c427060a0f73f6b...| 47214.51000000006|  343|
|99a4788cb24856965...|43025.560000000085|  488|
|3dd2a17168ec895c7...| 41082.60000000005|  274|
|25c38557cf793876c...| 38907.32000000001|   38|
|5f504b3a1c75b73d6...|37733.899999999994|   63|
|53b36df67ebb7c415...| 37683.42000000001|  323|
|aca2eb7d00ea1a7b8...| 37608.90000000007|  527|
|e0d64dcfaa3b6db5c...|          31786.82|  194|
|d285360f29ac7fd97...|31623.809999999983|  123|
|7a10781637204d8d1...|           30467.5|  143|
|f1c7f353075ce59d8...|          29997.36|  154|
|f819f0c84a64f02d3...|29024.479999999996|   45|
|588531f8ec37e7d5f...|28291.989999999998|   20|
|422879e10f4668299...|26577.219999999972|  484|
|16c4e87b98a9370a9...|           25034.0

# Data Cleaning and Transformation

## **Identifing Missing Values**

In [21]:
def missing_values(df, df_name):
    print(f'Missing Values in: {df_name}')
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show()

In [22]:
df_names = ['geolocation_df', 'products_df', 'sellers_df', 'translation_df', 'customer_df', 'order_items_df', 'order_payments_df', 'order_reviews_df', 'orders_df']
for name in df_names:
    missing_values(globals()[name], name)

Missing Values in: geolocation_df


                                                                                

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+

Missing Values in: products_df
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+------

## **Handling Missing Values**

1\. Drop missing values - non critical values

2\. Fill missing values - numerical values

3\. Impute missing values - for continous data

In [23]:
orders_df.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

**Dropping Missing Values**

In [24]:
order_df_cleaned = orders_df.na.drop(subset=['order_id', 'customer_id', 'order_status'])

In [25]:
missing_values(order_df_cleaned, 'Order Cleaned')

Missing Values in: Order Cleaned
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



**Filling Missing Values**

In [26]:
order_df_cleaned = order_df_cleaned.fillna({'order_delivered_customer_date':'9999-12-31', 'order_approved_at':'9999-12-31', 'order_delivered_carrier_date':'9999-12-31'})

In [27]:
missing_values(order_df_cleaned, 'Orders Cleaned')

Missing Values in: Orders Cleaned
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|                0|                           0|                            0|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



**Imputing Missing Values**

In [28]:
from pyspark.ml.feature import Imputer

In [29]:
order_payments_df.columns

['order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']

In [30]:
imputer = Imputer(inputCols = ['payment_value'], outputCols=['payment_value_imputed']).setStrategy('mean')
payments_df_cleaned = imputer.fit(order_payments_df).transform(order_payments_df)

In [31]:
missing_values(order_payments_df, 'Order Payment')

Missing Values in: Order Payment
+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



## **Standardizing Data Types**

In [32]:
for name in df_names:
    print(f'\n Schema of {name}:')
    globals()[name].printSchema()


 Schema of geolocation_df:
root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)


 Schema of products_df:
root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)


 Schema of sellers_df:
root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)


 Schema of tran

In [33]:
customer_df_cleaned = customer_df.withColumn('customer_zip_code_prefix', col('customer_zip_code_prefix').cast('string'))

In [34]:
customer_df_cleaned.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



## **Remove Duplicate Records**

In [35]:
df_names

['geolocation_df',
 'products_df',
 'sellers_df',
 'translation_df',
 'customer_df',
 'order_items_df',
 'order_payments_df',
 'order_reviews_df',
 'orders_df']

In [36]:
orders_df.groupBy('order_id').count().filter('count>1').show()

+--------+-----+
|order_id|count|
+--------+-----+
+--------+-----+



In [37]:
order_df_cleaned = orders_df.dropDuplicates(['order_id'])

In [38]:
order_df_cleaned.groupBy('order_id').count().filter('count>1').show()

+--------+-----+
|order_id|count|
+--------+-----+
+--------+-----+



## **Normalizing Column Values**

In [39]:
payments_df_cleaned = payments_df_cleaned\
    .withColumn('payment_type', when(col('payment_type')=='boleto', 'Bank Transfer')\
                .otherwise(expr("concat_ws(' ', transform(split(payment_type, '_'), x -> initcap(x)))")))


In [40]:
payments_df_cleaned.show(10)

+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential| payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1|  Credit Card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1|  Credit Card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1|  Credit Card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1|  Credit Card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1|  Credit Card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1|  Credit Card|                   2|        96.12|      

## **Joining DataFrames**

In [43]:
order_with_details = order_df_cleaned.join(order_items_df, 'order_id', 'left').join(payments_df_cleaned, 'order_id', 'left').join(customer_df_cleaned, 'customer_id', 'left')
order_with_details.limit(4).toPandas()

                                                                                

Unnamed: 0,customer_id,order_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_item_id,product_id,...,freight_value,payment_sequential,payment_type,payment_installments,payment_value,payment_value_imputed,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,6489ae5e4333f3693df5ad4372dab6d3,000229ec398224ef6ca0657da4fc703e,delivered,2018-01-14 14:33:31,2018-01-14 14:48:30,2018-01-16 12:36:48,2018-01-22 13:19:16,2018-02-05,1,c777355d18b72b67abbeef9df44fd0fd,...,17.87,1,Credit Card,5,216.87,216.87,3818d81c6709e39d06b2738a8d3a2474,35661,para de minas,MG
1,32e2e6ab09e778d99bf2e0ecd4898718,00054e8431b9d7675808bcb819fb4a32,delivered,2017-12-10 11:53:48,2017-12-10 12:10:31,2017-12-12 01:07:48,2017-12-18 22:03:38,2018-01-04,1,8d4f2bb7e93e6710a28f34fa83ee7d28,...,11.85,1,Credit Card,1,31.75,31.75,635d9ac1680f03288e72ada3a1035803,16700,guararapes,SP
2,9ed5e522dd9dd85b4af4a077526d8117,000576fe39319847cbb9d288c5617fa6,delivered,2018-07-04 12:08:27,2018-07-05 16:35:48,2018-07-05 12:15:00,2018-07-09 14:04:07,2018-07-25,1,557d850972a7d6f792fd18ae1400d9b6,...,70.75,1,Credit Card,10,880.75,880.75,fda4476abb6307ab3c415b7e6d026526,11702,praia grande,SP
3,16150771dfd4776261284213b89c304e,0005a1a1728c9d785b8e2b08b904576c,delivered,2018-03-19 18:40:33,2018-03-20 18:35:21,2018-03-28 00:37:42,2018-03-29 18:17:31,2018-03-29,1,310ae3c140ff94b03219ad0adc3c778f,...,11.65,1,Credit Card,3,157.6,157.6,639d23421f5517f69d0c3d6e6564cf0e,11075,santos,SP


## **Advanced Transformation or Feature Engineering**

In [45]:
order_items_df.select('price').summary().show()

[Stage 134:>                                                        (0 + 2) / 2]

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            112650|
|   mean|120.65373901471354|
| stddev|183.63392805026012|
|    min|              0.85|
|    25%|              39.9|
|    50%|             74.99|
|    75%|             134.9|
|    max|            6735.0|
+-------+------------------+



                                                                                

In [44]:
quantiles = order_items_df.approxQuantile('price', [.01, .99], 0.0)
low_cutoff, high_cutoff = quantiles[0], quantiles[1]
low_cutoff, high_cutoff

(9.99, 890.0)

In [46]:
order_item_df_cleaned = order_items_df.filter((col('price') >= low_cutoff) & (col('price') <= high_cutoff))

In [47]:
order_item_df_cleaned.select('price').summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            110453|
|   mean|108.49213068006871|
| stddev|112.87303173792675|
|    min|              9.99|
|    25%|             39.99|
|    50%|              74.9|
|    75%|             130.0|
|    max|             890.0|
+-------+------------------+



In [50]:
products_df_cleaned = products_df.withColumn('product_size_category', when(col('product_weight_g') < 500, 'Small')\
                                             .when(col('product_weight_g').between(500, 2000), 'Medium').otherwise('Large'))

In [51]:
products_df_cleaned.select('product_weight_g', 'product_size_category').show()

+----------------+---------------------+
|product_weight_g|product_size_category|
+----------------+---------------------+
|             225|                Small|
|            1000|               Medium|
|             154|                Small|
|             371|                Small|
|             625|               Medium|
|             200|                Small|
|           18350|                Large|
|             900|               Medium|
|             400|                Small|
|             600|               Medium|
|            1100|               Medium|
|            7150|                Large|
|             250|                Small|
|             600|               Medium|
|             200|                Small|
|             800|               Medium|
|             400|                Small|
|             900|               Medium|
|            1700|               Medium|
|             500|               Medium|
+----------------+---------------------+
only showing top

## **Load the Processed Data**

In [52]:
!hadoop fs -mkdir /data/olist_processed_data

In [59]:
customer_df_cleaned.write.mode('overwrite').parquet('/data/olist_processed_data/customer_df_cleaned.parquet')

In [60]:
!hadoop fs -ls -h /data/olist_processed_data/

Found 1 items
drwxr-xr-x   - root hadoop          0 2025-09-01 19:37 /data/olist_processed_data/customer_df_cleaned.parquet


## **Hive Meta Data Store**

In [61]:
# CREATE EXTERNAL TABLE customer_df_cleaned(
#     customer_id STRING,
#     customer_unique_id STRING,
#     customer_zip_code_prefix STRING,
#     customer_city STRING,
#     customer_state STRING
# )
# STORED AS PARQUET
# LOCATION '/data/olist_processed_data/customer_df_cleaned.parquet';

# Data Integration and Aggregation

1\. Join Datasets Efficiently

2\. Optimize Joins

3\. Complex Aggerations

4\. Caching and Optimized Queries for Performance

## **Data Integration**

In [5]:
order_items_df_joined = orders_df.join(order_items_df, 'order_id', 'inner')

order_items_products_df_joined = order_items_df_joined.join(products_df, 'product_id', 'inner')

order_items_products_seller_df_joined = order_items_products_df_joined.join(sellers_df, 'seller_id', 'inner')

full_df_joined = order_items_products_seller_df_joined.join(customer_df, 'customer_id', 'inner')

# Geo Location Dataset - Data Enrichment - Left Join

full_df_joined = full_df_joined.join(geolocation_df, full_df_joined.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix, 'left')

# Order Review Dataset - Left Join

full_df_joined = full_df_joined.join(order_reviews_df, 'order_id', 'left')

# Order Payment Dataset - Left Join

full_df_joined = full_df_joined.join(order_payments_df, 'order_id', 'left')

In [6]:
full_df_joined.limit(5).toPandas()

25/09/02 18:18:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,order_id,customer_id,seller_id,product_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,...,review_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp,payment_sequential,payment_type,payment_installments,payment_value
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,3504c0cb71d7fa48d967e0e4c94d59d9,87285b34884572647811a353c7ac498a,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,...,a54f0611adc9ed256b57ede6b6eb5114,4,,"Não testei o produto ainda, mas ele veio corre...",2017-10-11 00:00:00,2017-10-12 03:43:48,2,voucher,1,18.59
1,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,3504c0cb71d7fa48d967e0e4c94d59d9,87285b34884572647811a353c7ac498a,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,...,a54f0611adc9ed256b57ede6b6eb5114,4,,"Não testei o produto ainda, mas ele veio corre...",2017-10-11 00:00:00,2017-10-12 03:43:48,3,voucher,1,2.0
2,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,3504c0cb71d7fa48d967e0e4c94d59d9,87285b34884572647811a353c7ac498a,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,...,a54f0611adc9ed256b57ede6b6eb5114,4,,"Não testei o produto ainda, mas ele veio corre...",2017-10-11 00:00:00,2017-10-12 03:43:48,1,credit_card,1,18.12
3,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,3504c0cb71d7fa48d967e0e4c94d59d9,87285b34884572647811a353c7ac498a,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,...,a54f0611adc9ed256b57ede6b6eb5114,4,,"Não testei o produto ainda, mas ele veio corre...",2017-10-11 00:00:00,2017-10-12 03:43:48,2,voucher,1,18.59
4,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,3504c0cb71d7fa48d967e0e4c94d59d9,87285b34884572647811a353c7ac498a,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,...,a54f0611adc9ed256b57ede6b6eb5114,4,,"Não testei o produto ainda, mas ele veio corre...",2017-10-11 00:00:00,2017-10-12 03:43:48,3,voucher,1,2.0


## **Data Aggregation**

In [7]:
full_df_joined.cache()

DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [67]:
# Revenue per Seller

full_df_joined.groupBy('seller_id').agg(sum('price')).show(10)



+--------------------+--------------------+
|           seller_id|          sum(price)|
+--------------------+--------------------+
|3d5d0dc7073a299e3...|            170639.6|
|2138ccb85b11a4ec1...|   1943866.069999999|
|b76dba6c951ab00dc...|  302582.65999999887|
|7a67c85e85bb2ce85...| 2.031279489000002E7|
|d2374cbcbb3ca4ab1...|   3375517.550000012|
|b9a03475e6447e631...|  270153.19999999995|
|e333046ce6517bd8b...|           1214200.0|
|cca3071e3e9bb7d12...|1.0115934269999994E7|
|2b3e4a2a3ea8e0193...|  2094077.9600000007|
|431af27f296bc6519...|  1812316.1000000064|
+--------------------+--------------------+
only showing top 10 rows



                                                                                

In [9]:
# Total Order per Customer

full_df_joined.groupBy('customer_id').count().orderBy('count', ascending=False).show()



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|351e40989da90e704...|11427|
|50920f8cd0681fd86...|10752|
|9b43e2a62de9bab3a...| 8556|
|270c23a11d024a44c...| 8001|
|5c87184371002d49e...| 6876|
|d3e82ccec3cb5f956...| 6876|
|d5f2b3f597c7ccafb...| 6706|
|c2f18647725395af4...| 6612|
|24e7dc2ff8c071263...| 6597|
|7bb57d182bdc11653...| 6258|
|63b964e79dee32a35...| 6072|
|d22f25a9fadfb1abb...| 6072|
|1ff773612ab8934db...| 5820|
|13aa59158da63ba0e...| 5206|
|78fc46047c4a639e8...| 5200|
|dd3f1762eb601f41c...| 4992|
|a193aa8d905b8e246...| 4896|
|9eb3d566e87289dcb...| 4872|
|2ba91e12e5e4c9f56...| 4752|
|55e7cfd6e28d2fbfb...| 4728|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [18]:
from pyspark.sql.types import *

In [23]:
full_df_joined = full_df_joined.withColumn('review_score', col('review_score').cast('integer'))

full_df_joined.select('review_score').printSchema()

root
 |-- review_score: integer (nullable = true)



In [24]:
# Average Review Score per Seller

full_df_joined.groupBy('seller_id').avg('review_score').orderBy('avg(review_score)', ascending=False).show()



+--------------------+-----------------+
|           seller_id|avg(review_score)|
+--------------------+-----------------+
|686886775c8f6b119...|              5.0|
|9c1c0c36cd23c2089...|              5.0|
|666658b8da8370f30...|              5.0|
|01266d4c46afa5196...|              5.0|
|c18309219e789960a...|              5.0|
|ebd1417732defaf66...|              5.0|
|740205fedfe4e90ca...|              5.0|
|e88f7a2995ba18471...|              5.0|
|2f1e7b90e01d9f086...|              5.0|
|cd233f8bfa30ebfd6...|              5.0|
|c3e2398fcc7e581cd...|              5.0|
|404e1ba01358af4cd...|              5.0|
|1de62b6f2fd962276...|              5.0|
|09bad886111255c5b...|              5.0|
|5721089ba9214e800...|              5.0|
|c8c1bea22194a4eef...|              5.0|
|05aebce0b5c8f0d41...|              5.0|
|bab0a8cd684e41ba6...|              5.0|
|4049512728d969bec...|              5.0|
|0a82f5fb06a4fe4ac...|              5.0|
+--------------------+-----------------+
only showing top

                                                                                

In [26]:
full_df_joined.columns

full_df_joined.select('price').printSchema()

root
 |-- price: double (nullable = true)



In [29]:
# Top Customer by Spending

full_df_joined.groupBy('customer_id').agg(sum('price')).withColumn('sum(price)', round('sum(price)', 1)).orderBy('sum(price)', ascending=False).show()



+--------------------+----------+
|         customer_id|sum(price)|
+--------------------+----------+
|d3e82ccec3cb5f956...| 6662844.0|
|df55c14d1476a9a34...| 3565657.0|
|fe5113a38e3575c04...| 3293604.0|
|ec5b2ba62e5743423...| 2556120.0|
|63b964e79dee32a35...| 2501664.0|
|46bb3c0b1a65c8399...| 2336752.0|
|05455dfa7cd02f13d...| 2160194.4|
|3690e975641f01bd0...| 2124498.0|
|349509b216bd5ec11...| 1923627.0|
|695476b5848d64ba0...| 1820543.1|
|73236a0796f53d60d...| 1755520.0|
|cc803a2c412833101...| 1676400.0|
|1ff773612ab8934db...| 1658641.8|
|fced842c7dad61e8c...| 1654898.0|
|1ecb47d23dc8203cd...| 1629588.4|
|de832e8dbb1f588a4...| 1584990.6|
|803cd9b04f9cd252c...| 1512312.0|
|d72181923840c8895...| 1488114.9|
|06d478ba352a27a51...| 1461150.0|
|0049e8442c2a3e4a8...| 1444800.0|
+--------------------+----------+
only showing top 20 rows



                                                                                

In [30]:
# Top 10 Most Sold Products

full_df_joined.groupBy('product_id').agg(count('order_id').alias('total_sold')).orderBy(desc('total_sold')).limit(10).show()



+--------------------+----------+
|          product_id|total_sold|
+--------------------+----------+
|aca2eb7d00ea1a7b8...|     86740|
|422879e10f4668299...|     81110|
|99a4788cb24856965...|     78775|
|389d119b48cf3043d...|     60248|
|d1c427060a0f73f6b...|     59274|
|368c6c730842d7801...|     58358|
|53759a2ecddad2bb8...|     52654|
|53b36df67ebb7c415...|     52105|
|154e7e31ebfa09220...|     42700|
|3dd2a17168ec895c7...|     40787|
+--------------------+----------+



                                                                                

## **Window Function and Ranking**

In [32]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [34]:
# Rank Top Selling Products per Seller

top_seller_products_df = full_df_joined.withColumn('rank', rank().over(window_spec)).filter(col('rank') <= 5)
top_seller_products_df.select('seller_id', 'price', 'rank').show()

[Stage 91:>                                                         (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

In [39]:
# Dense Rank for Seller Based on Revenue

seller_revenue = full_df_joined.groupBy('seller_id').agg(sum('price').alias('total_revenue')).withColumn('total_revenue', round('total_revenue',1))

window_spec = Window.partitionBy('seller_id').orderBy(desc('total_revenue'))

dense_rank_seller_revenue = seller_revenue.withColumn('dense_rank', dense_rank().over(window_spec))
dense_rank_seller_revenue.show()



+--------------------+-------------+----------+
|           seller_id|total_revenue|dense_rank|
+--------------------+-------------+----------+
|0015a82c2db000af6...|     755380.0|         1|
|001cca7ae9ae17fb1...|    3888092.4|         1|
|002100f778ceb8431...|     181886.9|         1|
|00ab3eff1b5192e5f...|       6076.0|         1|
|01c97ebb5cdac5289...|      66335.5|         1|
|01cf7e3d21494c41f...|    1533269.4|         1|
|02ecc2a19303f05e5...|     973622.8|         1|
|02f623a8eb246f3c5...|     957411.0|         1|
|0307f7565ff85b299...|     487548.0|         1|
|038b75b729c8a9a04...|      17979.5|         1|
|039e6ad9dae796144...|    1297420.0|         1|
|0417b067eeab773d2...|     333797.6|         1|
|042573af89b6d931f...|     104089.5|         1|
|048c2757535328e0d...|    1107250.2|         1|
|0570350b23eda5444...|      69203.6|         1|
|05730013efda59630...|     310828.0|         1|
|058fd0aa2bfdb2274...|    1372803.2|         1|
|05a48cc8859962767...|       2316.6|    

                                                                                

## **Optimized Joins for Data Integration**

In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [5]:
# Broadcasting dimentional Datasets

order_items_df_joined = orders_df.join(order_items_df, 'order_id', 'inner')

order_items_products_df_joined = order_items_df_joined.join(products_df, 'product_id', 'inner')

order_items_products_seller_df_joined = order_items_products_df_joined.join(broadcast(sellers_df), 'seller_id', 'inner')

full_df_joined = order_items_products_seller_df_joined.join(customer_df, 'customer_id', 'inner')

full_df_joined = full_df_joined.join(broadcast(geolocation_df), full_df_joined.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix, 'left')

full_df_joined = full_df_joined.join(broadcast(order_reviews_df), 'order_id', 'left')

full_df_joined = full_df_joined.join(order_payments_df, 'order_id', 'left')

In [6]:
full_df_joined.cache()

25/09/04 18:12:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

## **Advanced Data Integration**

In [7]:
# Total Revenue and Average Order Value per Customer

customer_spending_df = full_df_joined.groupBy('customer_id')\
                .agg(count('order_id').alias('total_orders'), 
                     round(sum('price'), 1).alias('total_spent'),
                     round(avg('price'),1).alias('avg_order_value'))\
                .orderBy(desc('total_spent'))

customer_spending_df.show(10)



+--------------------+------------+-----------+---------------+
|         customer_id|total_orders|total_spent|avg_order_value|
+--------------------+------------+-----------+---------------+
|d3e82ccec3cb5f956...|        6876|  6662844.0|          969.0|
|df55c14d1476a9a34...|         743|  3565657.0|         4799.0|
|fe5113a38e3575c04...|        2292|  3293604.0|         1437.0|
|ec5b2ba62e5743423...|        1428|  2556120.0|         1790.0|
|63b964e79dee32a35...|        6072|  2501664.0|          412.0|
|46bb3c0b1a65c8399...|         748|  2336752.0|         3124.0|
|05455dfa7cd02f13d...|        2184|  2160194.4|          989.1|
|3690e975641f01bd0...|         802|  2124498.0|         2649.0|
|349509b216bd5ec11...|         743|  1923627.0|         2589.0|
|695476b5848d64ba0...|         687|  1820543.1|         2650.0|
+--------------------+------------+-----------+---------------+
only showing top 10 rows



                                                                                

In [12]:
# Seller Performance Metrics (Revenue, Average Review, Order Count)

seller_performance_df = full_df_joined.groupBy('seller_id')\
                        .agg(count('order_id').alias('total_orders'),
                             round(sum('price'), 1).alias('total_revenue'),
                             round(avg('review_score'), 1).alias('avg_review_score'),
                             round(stddev('price'), 1).alias('price_variability'))\
                        .orderBy(desc('total_revenue'))

seller_performance_df.show(10)



+--------------------+------------+-------------+----------------+-----------------+
|           seller_id|total_orders|total_revenue|avg_review_score|price_variability|
+--------------------+------------+-------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.61387173E7|             4.1|            111.7|
|53243585a1d6dc264...|       54514|  3.4291593E7|             4.1|            499.7|
|4a3ca9315b744ce9f...|      330661| 3.37595708E7|             3.8|             59.4|
|7c67e1448b00f6e96...|      233306| 3.22823218E7|             3.4|             50.4|
|fa1c13f2614d7b5c4...|       87686| 3.01393863E7|             4.4|            307.7|
|da8622b14eb17ae28...|      264433| 2.98576697E7|             4.0|             72.9|
|7e93a43ef30c4f03f...|       50226| 2.63157063E7|             4.1|            377.2|
|1025f0e2d44d7041d...|      229587| 2.29375185E7|             3.9|             84.3|
|46dc3b2cc0980fb8e...|       90426| 2.17917733E7|             4.2

                                                                                

In [14]:
# Product Popularity Metrics

product_metrics_df = full_df_joined.groupBy('product_id')\
                    .agg(count('product_id').alias('total_sales'),
                         round(sum('price'), 1).alias('total_revenue'),
                         round(avg('price'), 1).alias('avg_price'),
                         round(stddev('price'), 1).alias('price_volatility'),
                         collect_set('seller_id').alias('unique_seller'))\
                    .orderBy(desc('total_sales'))
    
product_metrics_df.show(10)



+--------------------+-----------+-------------+---------+----------------+--------------------+
|          product_id|total_sales|total_revenue|avg_price|price_volatility|       unique_seller|
+--------------------+-----------+-------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740|    6164630.3|     71.1|             3.2|[955fee9216a65b61...|
|422879e10f4668299...|      81110|    4442791.5|     54.8|             4.5|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775|    6921762.7|     87.9|             4.1|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|    3280533.1|     54.5|             4.4|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274|    8220103.3|    138.7|            16.6|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358|    3181698.9|     54.5|             4.6|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654|    2893017.5|     54.9|             4.5|[1f50f920176fa81d...|
|53b36df67ebb7c415...|      52

                                                                                

In [20]:
full_df_joined.select('order_purchase_timestamp').printSchema()

root
 |-- order_purchase_timestamp: timestamp (nullable = true)



In [21]:
full_df_joined = full_df_joined.withColumn('order_purchase_year', year(col('order_purchase_timestamp'))).withColumn('order_purchase_month', month(col('order_purchase_timestamp')))
full_df_joined.select('order_id', 'order_purchase_timestamp', 'order_purchase_year', 'order_purchase_month').show(5)

+--------------------+------------------------+-------------------+--------------------+
|            order_id|order_purchase_timestamp|order_purchase_year|order_purchase_month|
+--------------------+------------------------+-------------------+--------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|               2017|                  10|
|e481f51cbdc54678b...|     2017-10-02 10:56:33|               2017|                  10|
|e481f51cbdc54678b...|     2017-10-02 10:56:33|               2017|                  10|
|e481f51cbdc54678b...|     2017-10-02 10:56:33|               2017|                  10|
|e481f51cbdc54678b...|     2017-10-02 10:56:33|               2017|                  10|
+--------------------+------------------------+-------------------+--------------------+
only showing top 5 rows



In [23]:
# Monthly Revenue and Order Count Trend

monthly_order_df = full_df_joined.groupBy('order_purchase_month')\
                    .agg(count('order_id').alias('total_orders'),
                         round(sum('price'), 1).alias('total_revenue'),
                         round(avg('price'), 1).alias('avg_order_value'),
                         round(min('price'), 1).alias('min_order_value'),
                         round(max('price'), 1).alias('max_order_value'))\
                    .orderBy('order_purchase_month')
    
monthly_order_df.show()



+--------------------+------------+-------------+---------------+---------------+---------------+
|order_purchase_month|total_orders|total_revenue|avg_order_value|min_order_value|max_order_value|
+--------------------+------------+-------------+---------------+---------------+---------------+
|                   1|     1495580|1.715329015E8|          114.7|            2.9|         3690.0|
|                   2|     1551163|1.787817841E8|          115.3|            3.0|         6735.0|
|                   3|     1809467|2.186811684E8|          120.9|            4.9|         4100.0|
|                   4|     1693860|2.171569691E8|          128.2|            0.9|         4799.0|
|                   5|     1918571| 2.40061152E8|          125.1|            3.5|         6499.0|
|                   6|     1701909|2.102433235E8|          123.5|            3.5|         4590.0|
|                   7|     1847639|2.229088571E8|          120.6|            1.2|         6729.0|
|                   

                                                                                

In [24]:
# Yearly Revenue and Order Count Trend

yearly_order_df = full_df_joined.groupBy('order_purchase_year')\
                    .agg(count('order_id').alias('total_orders'),
                         round(sum('price'), 1).alias('total_revenue'),
                         round(avg('price'), 1).alias('avg_order_value'),
                         round(min('price'), 1).alias('min_order_value'),
                         round(max('price'), 1).alias('max_order_value'))\
                    .orderBy('order_purchase_year')
    
yearly_order_df.show()



+-------------------+------------+--------------+---------------+---------------+---------------+
|order_purchase_year|total_orders| total_revenue|avg_order_value|min_order_value|max_order_value|
+-------------------+------------+--------------+---------------+---------------+---------------+
|               2016|       64384|     8361960.0|          129.9|            6.0|         1399.0|
|               2017|     8280691|1.0041411242E9|          121.3|            1.2|         6735.0|
|               2018|     9719186|1.1803210943E9|          121.4|            0.9|         6729.0|
+-------------------+------------+--------------+---------------+---------------+---------------+



                                                                                

In [64]:
# Customer Retention Analysis

full_df_joined= full_df_joined.withColumn('order_purchase_date', to_date('order_purchase_timestamp'))

window_first = Window.partitionBy('customer_id').orderBy(asc('order_purchase_date'))
window_last = Window.partitionBy('customer_id').orderBy(desc('order_purchase_date'))

df_first = full_df_joined.withColumn('first', row_number().over(window_first)).filter(col('first') == 1)
df_last = full_df_joined.withColumn('last', row_number().over(window_last)).filter(col('last') == 1 )

df_agg = full_df_joined.groupBy('customer_id')\
            .agg(count('order_id').alias('total_orders'),
                 round(avg('price'), 1).alias('avg_order_value'))

customer_retention_df = df_agg.join(df_first.select('customer_id', col('order_purchase_date').alias('first_order_date')), on = 'customer_id')\
                              .join(df_last.select('customer_id', col('order_purchase_date').alias('last_order_date')), on = 'customer_id')

customer_retention_df.show()

                                                                                

+--------------------+------------+---------------+----------------+---------------+
|         customer_id|total_orders|avg_order_value|first_order_date|last_order_date|
+--------------------+------------+---------------+----------------+---------------+
|000161a058600d590...|         106|           54.9|      2017-07-16|     2017-07-16|
|000419c5494106c30...|        1146|           34.3|      2018-03-02|     2018-03-02|
|0015f7887e2fde13d...|          14|          168.0|      2017-07-31|     2017-07-31|
|0017a0b4c1f1bdb9c...|          88|           34.9|      2018-03-03|     2018-03-03|
|001c7f05398c45b42...|         176|           13.7|      2018-04-12|     2018-04-12|
|001f150aebb5d897f...|          25|          110.3|      2018-05-09|     2018-05-09|
|001f6f1a5e902ad14...|         561|           23.9|      2017-02-07|     2017-02-07|
|002905287304e28c0...|          45|           13.5|      2018-07-22|     2018-07-22|
|0029cdf064769cabd...|          62|          460.0|      2018-02-

In [66]:
customer_spending_df.join(df_first.select('customer_id', col('order_purchase_date').alias('first_order_date')), on = 'customer_id')\
                    .join(df_last.select('customer_id', col('order_purchase_date').alias('last_order_date')), on = 'customer_id').show()

                                                                                

+--------------------+------------+-----------+---------------+----------------+---------------+
|         customer_id|total_orders|total_spent|avg_order_value|first_order_date|last_order_date|
+--------------------+------------+-----------+---------------+----------------+---------------+
|000161a058600d590...|         106|     5819.4|           54.9|      2017-07-16|     2017-07-16|
|000419c5494106c30...|        1146|    39307.8|           34.3|      2018-03-02|     2018-03-02|
|0015f7887e2fde13d...|          14|     2351.9|          168.0|      2017-07-31|     2017-07-31|
|0017a0b4c1f1bdb9c...|          88|     3071.2|           34.9|      2018-03-03|     2018-03-03|
|001c7f05398c45b42...|         176|     2402.4|           13.7|      2018-04-12|     2018-04-12|
|001f150aebb5d897f...|          25|     2758.0|          110.3|      2018-05-09|     2018-05-09|
|001f6f1a5e902ad14...|         561|    13407.9|           23.9|      2017-02-07|     2017-02-07|
|002905287304e28c0...|        

In [67]:
full_df_joined.groupBy('customer_id')\
        .agg(countDistinct('order_purchase_date').alias('distinct_order_days'))\
        .filter(col('distinct_order_days') > 1)\
        .show()



+-----------+-------------------+
|customer_id|distinct_order_days|
+-----------+-------------------+
+-----------+-------------------+



                                                                                

In [68]:
full_df_joined.select(countDistinct('customer_id')).collect()[0][0]

                                                                                

98666

In [69]:
full_df_joined.count()

18064261

## **Advanced Enrichment**

In [72]:
full_df_joined.select('order_status').distinct().show()



+------------+
|order_status|
+------------+
|   delivered|
|    canceled|
|     shipped|
|    approved|
|  processing|
|    invoiced|
| unavailable|
+------------+



                                                                                

In [78]:
# Order Status Flag

full_df_joined = full_df_joined.withColumn('is_delivered', when(col('order_status') == 'delivered', lit(1)).otherwise(lit(0)))\
                               .withColumn('is_canceled', when(col('order_status') == 'canceled', lit(1)).otherwise(lit(0)))
                                           
full_df_joined.select('order_status', 'is_delivered', 'is_canceled').filter(col('order_status') == 'canceled').show(10)

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
+------------+------------+-----------+
only showing top 10 rows



In [79]:
customer_spending_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- total_orders: long (nullable = false)
 |-- total_spent: double (nullable = true)
 |-- avg_order_value: double (nullable = true)



In [88]:
# Customer Segmentation Based on Spending

customer_spending_df = customer_spending_df.withColumn('customer_segment', when(col('avg_order_value') >= 1200, 'High-value')\
                                                                           .when(col('avg_order_value').between(500, 1200), 'Medium-value')\
                                                                           .otherwise('Low_value'))

customer_spending_df.show(10)



+--------------------+------------+-----------+---------------+----------------+
|         customer_id|total_orders|total_spent|avg_order_value|customer_segment|
+--------------------+------------+-----------+---------------+----------------+
|d3e82ccec3cb5f956...|        6876|  6662844.0|          969.0|    Medium-value|
|df55c14d1476a9a34...|         743|  3565657.0|         4799.0|      High-value|
|fe5113a38e3575c04...|        2292|  3293604.0|         1437.0|      High-value|
|ec5b2ba62e5743423...|        1428|  2556120.0|         1790.0|      High-value|
|63b964e79dee32a35...|        6072|  2501664.0|          412.0|       Low_value|
|46bb3c0b1a65c8399...|         748|  2336752.0|         3124.0|      High-value|
|05455dfa7cd02f13d...|        2184|  2160194.4|          989.1|    Medium-value|
|3690e975641f01bd0...|         802|  2124498.0|         2649.0|      High-value|
|349509b216bd5ec11...|         743|  1923627.0|         2589.0|      High-value|
|695476b5848d64ba0...|      

                                                                                

In [90]:
# full_df_joined = full_df_joined.join(customer_spending_df.select('customer_id', 'customer_segment'), on = 'customer_id', how = 'left')

full_df_joined.select('customer_id', 'customer_segment').show()



+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
|9ef432eb625129730...|       Low_value|
+--------------------+----------------+
only showing top 20 rows



                                                                                

In [8]:
# Hourly Order Distributions

full_df_joined = full_df_joined.withColumn('hour_of_day', expr('hour(order_purchase_timestamp)'))

full_df_joined.select('customer_id', 'order_purchase_timestamp', 'hour_of_day').show(5)

+--------------------+------------------------+-----------+
|         customer_id|order_purchase_timestamp|hour_of_day|
+--------------------+------------------------+-----------+
|9ef432eb625129730...|     2017-10-02 10:56:33|         10|
|9ef432eb625129730...|     2017-10-02 10:56:33|         10|
|9ef432eb625129730...|     2017-10-02 10:56:33|         10|
|9ef432eb625129730...|     2017-10-02 10:56:33|         10|
|9ef432eb625129730...|     2017-10-02 10:56:33|         10|
+--------------------+------------------------+-----------+
only showing top 5 rows



In [17]:
# Weekend vs Weekday Orders

full_df_joined = full_df_joined.withColumn('order_day_type', when(dayofweek('order_purchase_timestamp').isin(1, 7), lit('Weekend')).otherwise(lit('Weekday')))
                                           
full_df_joined.select('price', 'order_status', 'order_day_type').filter(col('order_day_type') == 'Weekend').show(5)                                           

+-----+------------+--------------+
|price|order_status|order_day_type|
+-----+------------+--------------+
| 45.0|   delivered|       Weekend|
| 45.0|   delivered|       Weekend|
| 45.0|   delivered|       Weekend|
| 45.0|   delivered|       Weekend|
| 45.0|   delivered|       Weekend|
+-----+------------+--------------+
only showing top 5 rows



In [23]:
full_df_joined.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [24]:
full_df_joined.select('freight_value').summary().show()



+-------+------------------+
|summary|     freight_value|
+-------+------------------+
|  count|          18064261|
|   mean|19.602952943693275|
| stddev|14.953399394001034|
|    min|               0.0|
|    25%|             13.47|
|    50%|             16.25|
|    75%|             20.32|
|    max|            409.68|
+-------+------------------+



                                                                                

In [25]:
freight = full_df_joined.approxQuantile('freight_value', [.25, .8], 0.0)
freight_low, freight_high = freight[0], freight[1]
freight_low, freight_high

                                                                                

(13.47, 22.42)

In [33]:
# Freight Category

full_df_joined = full_df_joined.withColumn('freight_category', when(col('freight_value') > freight_high, lit('High'))\
                                           .when(col('freight_value').between(freight_low, freight_high), lit('Medium'))\
                                           .otherwise(lit('Low')))

full_df_joined.select('freight_category').distinct().show()

+----------------+
|freight_category|
+----------------+
|          Medium|
|             Low|
|            High|
+----------------+



In [36]:
# Order Volume by Customer State

order_value_state_df = full_df_joined.groupBy('customer_state').agg(count('order_id').alias('order_value')).orderBy(asc('customer_state'))

order_value_state_df.show(5)



+--------------+-----------+
|customer_state|order_value|
+--------------+-----------+
|            AC|       8686|
|            AL|      37742|
|            AM|       6488|
|            AP|       5698|
|            BA|     443992|
+--------------+-----------+
only showing top 5 rows



                                                                                