# **Setting UP Spark Environment**

  

**1\. Deploy Spark Cluster** 

The Apache Spark cluster was deployed on Dataproc - Google Cloud Platform (GCP) using the GCP SDK with automated scripts based on the code provided. This setup enabled a scalable and managed Spark environment suitable for processing large datasets.

gcloud dataproc clusters create my-cluster --region=us-central1 --zone=us-central1-a --master-machine-type=e2-standard-4 --master-boot-disk-size=50GB --num-workers=2 --worker-machine-type=e2-standard-4 --worker-boot-disk-size=50GB --image-version=2.1-debian11 --enable-component-gateway --optional-components=JUPYTER,ZEPPELIN --properties="spark:spark.ui.port=0" --metadata="PIP_PACKAGES=pandas numpy matplotlib seaborn scikit-learn" --project=keen-truth-461516-a0


**2\. Data Extraction and Local Directory Setup**  

A new directory named `olist` was created to organize the project files for the data engineering workflow. The [**Olist Brazilian E-commerce dataset**](https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce) was downloaded directly from **Kaggle** using a `curl` command within a Bash script, executed through a remote SSH terminal.

  

#!/bin/bash curl -L -o ~/olist/brazilian-ecommerce.zip https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce

  

After downloading the dataset:

*   The ZIP file was extracted using standard unzip cmd (**unzip brazilian-ecommerce.zip -d ~/olist/dataset/**).

*   A new subdirectory named `dataset` was created inside the main `olist` directory.
    
*   All the extracted `.csv` files were moved into the `dataset` folder for structured data storage and easier access during the ETL process.

**3. Data Injection to Hadoop Cluster**
The olist datasets was injected into the Hadoop Distributed File System (HDFS) to enable distributed storage and parallel processing using the Hadoop ecosystem.

*  📁 HDFS Directory Creation:
A new directory was created in HDFS to store the CSV files related to the Olist project: **hadoop fs -mkdir -p /data/olist/**

* 📤Uploading CSV Files to HDFS:
All CSV files from the local project directory (~/olist/dataset) were copied to the HDFS directory /data/olist using the hadoop fs -put command: **hadoop fs -put ~/olist/dataset/*.csv /data/olist/**

# Data Exploration

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('OlistDataset') \
.getOrCreate()

spark

25/06/08 05:58:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
!hadoop fs -ls /data/olist/

Found 9 items
-rw-r--r--   2 dgclub21 hadoop    9033957 2025-06-07 12:21 /data/olist/olist_customers_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   61273883 2025-06-07 12:21 /data/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   15438671 2025-06-07 12:21 /data/olist/olist_order_items_dataset.csv
-rw-r--r--   2 dgclub21 hadoop    5777138 2025-06-07 12:21 /data/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   14451670 2025-06-07 12:21 /data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   17654914 2025-06-07 12:21 /data/olist/olist_orders_dataset.csv
-rw-r--r--   2 dgclub21 hadoop    2379446 2025-06-07 12:21 /data/olist/olist_products_dataset.csv
-rw-r--r--   2 dgclub21 hadoop     174703 2025-06-07 12:21 /data/olist/olist_sellers_dataset.csv
-rw-r--r--   2 dgclub21 hadoop       2613 2025-06-07 12:21 /data/olist/product_category_name_translation.csv


In [3]:
hdfs_path = '/data/olist/'      # This approach allows easy access to all dataset files by simply updating the filename in the path.

In [4]:
customer_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)
customer_df.show(5)

                                                                                

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows



In [64]:
location_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True,inferSchema=True)
item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True,inferSchema=True)
payment_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True,inferSchema=True)
review_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
order_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True,inferSchema=True)
product_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True,inferSchema=True)
seller_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True,inferSchema=True)
translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [15]:
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [16]:
location_df.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [15]:
payment_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



## Data Validation – Row Count Check

checking for Data Leakage (any data is missed during Extraction or in Injection Process).

In [22]:
print(f'Customer_df : {customer_df.count()} rows')
print(f'location_df : {location_df.count()} rows')
print(f'item_df     : {item_df.count()} rows')
print(f'payment_df  : {payment_df.count()} rows')
print(f'review_df   : {review_df.count()} rows')
print(f'order_df    : {order_df.count()} rows')
print(f'product_df  : {product_df.count()} rows')
print(f'seller_df   : {seller_df.count()} rows')
print(f'translation_df : {translation_df.count()} rows')

Customer_df : 99441 rows
location_df : 1000163 rows
item_df     : 112650 rows
payment_df  : 103886 rows
review_df   : 104162 rows
order_df    : 99441 rows
product_df  : 32951 rows
seller_df   : 3095 rows
translation_df : 71 rows


In [24]:
payment_df.distinct().count()

                                                                                

103886

## Null Validation

In [30]:
customer_df.columns

['customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state']

In [43]:
from pyspark.sql.functions import col

customer_df.select([col(i).isNull() for i in customer_df.columns]).show()

+---------------------+----------------------------+----------------------------------+-----------------------+------------------------+
|(customer_id IS NULL)|(customer_unique_id IS NULL)|(customer_zip_code_prefix IS NULL)|(customer_city IS NULL)|(customer_state IS NULL)|
+---------------------+----------------------------+----------------------------------+-----------------------+------------------------+
|                false|                       false|                             false|                  false|                   false|
|                false|                       false|                             false|                  false|                   false|
|                false|                       false|                             false|                  false|                   false|
|                false|                       false|                             false|                  false|                   false|
|                false|                  

In [14]:
from pyspark.sql.functions import col,when,count,lit

customer_df.select([count(when(col(c).isNull(),1)).alias(c) for c in customer_df.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [38]:
from pyspark.sql.functions import col,when,count

order_df.select([count(when(col(c).isNull(),1)).alias(c) for c in order_df.columns]).show(vertical=True)

-RECORD 0-----------------------------
 order_id                      | 0    
 customer_id                   | 0    
 order_status                  | 0    
 order_purchase_timestamp      | 0    
 order_approved_at             | 160  
 order_delivered_carrier_date  | 1783 
 order_delivered_customer_date | 2965 
 order_estimated_delivery_date | 0    



## Duplicate Validation

In [46]:
customer_df.groupBy('customer_unique_id').count().filter('count>1').orderBy('count',ascending=False).show()



+--------------------+-----+
|  customer_unique_id|count|
+--------------------+-----+
|8d50f5eadf50201cc...|   17|
|3e43e6105506432c9...|    9|
|ca77025e7201e3b30...|    7|
|1b6c7548a2a1f9037...|    7|
|6469f99c1f9dfae77...|    7|
|f0e310a6839dce9de...|    6|
|12f5d6e1cbf93dafd...|    6|
|dc813062e0fc23409...|    6|
|47c1a3033b8b77b3a...|    6|
|de34b16117594161a...|    6|
|63cfc61cee11cbe30...|    6|
|56c8638e7c058b98a...|    5|
|394ac4de8f3acb142...|    5|
|5e8f38a9a1c023f3d...|    5|
|74cb1ad7e6d567432...|    5|
|b4e4f24de1e8725b7...|    5|
|35ecdf6858edc6427...|    5|
|fe81bb32c243a86b2...|    5|
|4e65032f1f574189f...|    5|
|9cc5a07f169a1606f...|    4|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [6]:
# Customers Distribution by State

from pyspark.sql.functions import desc

customer_df.groupBy('customer_state').count().orderBy(desc('count')).show()



+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



                                                                                

In [7]:
# Order : Order Status Distribution

order_df.groupBy('order_status').count().orderBy('count',ascending=False).show()



+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



                                                                                

In [9]:
# Payments : Finding Hieghest Payment type

payment_df.groupBy('payment_type').count().orderBy('count',ascending=False).show()

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



                                                                                

In [8]:
payment_df.show(3)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 3 rows



In [27]:
# Order Items : Finding Top Selling Products

from pyspark.sql.functions import sum,round

top_products = item_df.groupBy('product_id').agg(round(sum('price')).alias('total_sales'))
top_products.orderBy('total_sales',ascending=False).show(5)



+--------------------+-----------+
|          product_id|total_sales|
+--------------------+-----------+
|bb50f2e236e5eea01...|    63885.0|
|6cdd53843498f9289...|    54730.0|
|d6160fb7873f18409...|    48899.0|
|d1c427060a0f73f6b...|    47215.0|
|99a4788cb24856965...|    43026.0|
+--------------------+-----------+
only showing top 5 rows



                                                                                

In [39]:
# Orders : Average Delivery Time Analysis

delivery_df = order_df.select('order_id','order_purchase_timestamp','order_delivered_customer_date')
delivery_df.show(5)

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|
+--------------------+------------------------+-----------------------------+
only showing top 5 rows



In [41]:
from pyspark.sql.functions import datediff,to_date,col

delivery_time = delivery_df.withColumn('delivery_time',datediff(col('order_delivered_customer_date'),col('order_purchase_timestamp')))
delivery_time.orderBy('delivery_time',ascending=False).show()



+--------------------+------------------------+-----------------------------+-------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_time|
+--------------------+------------------------+-----------------------------+-------------+
|ca07593549f1816d2...|     2017-02-21 23:31:27|          2017-09-19 14:36:39|          210|
|1b3190b2dfa9d789e...|     2018-02-23 14:57:35|          2018-09-19 23:24:07|          208|
|440d0d17af552815d...|     2017-03-07 23:59:51|          2017-09-19 15:12:50|          196|
|2fb597c2f772eca01...|     2017-03-08 18:09:02|          2017-09-19 14:33:17|          195|
|285ab9426d6982034...|     2017-03-08 22:47:40|          2017-09-19 14:00:04|          195|
|0f4519c5f1c541dde...|     2017-03-09 13:26:57|          2017-09-19 14:38:21|          194|
|47b40429ed8cce3ae...|     2018-01-03 09:44:01|          2018-07-13 20:51:31|          191|
|2fe324febf907e3ea...|     2017-03-13 20:17:10|          2017-09-19 17:00:07|   

                                                                                

# Data Cleaning & Transformation

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('Data Cleaning & Transformation') \
    .getOrCreate()

spark

25/06/15 10:56:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
!hadoop fs -ls /data/olist/

Found 9 items
-rw-r--r--   2 dgclub21 hadoop    9033957 2025-06-07 12:21 /data/olist/olist_customers_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   61273883 2025-06-07 12:21 /data/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   15438671 2025-06-07 12:21 /data/olist/olist_order_items_dataset.csv
-rw-r--r--   2 dgclub21 hadoop    5777138 2025-06-07 12:21 /data/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   14451670 2025-06-07 12:21 /data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 dgclub21 hadoop   17654914 2025-06-07 12:21 /data/olist/olist_orders_dataset.csv
-rw-r--r--   2 dgclub21 hadoop    2379446 2025-06-07 12:21 /data/olist/olist_products_dataset.csv
-rw-r--r--   2 dgclub21 hadoop     174703 2025-06-07 12:21 /data/olist/olist_sellers_dataset.csv
-rw-r--r--   2 dgclub21 hadoop       2613 2025-06-07 12:21 /data/olist/product_category_name_translation.csv


In [1]:
hdfs_path = '/data/olist/'

In [2]:
customer_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)
location_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True,inferSchema=True)
item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True,inferSchema=True)
payment_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True,inferSchema=True)
review_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
order_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True,inferSchema=True)
product_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True,inferSchema=True)
seller_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True,inferSchema=True)
translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

## Finding Missing Values

In [15]:
from pyspark.sql.functions import *

customer_df.select([count(when(col(c).isNull(),1)).alias(c) for c in customer_df.columns]).show(vertical=True)

-RECORD 0-----------------------
 customer_id              | 0   
 customer_unique_id       | 0   
 customer_zip_code_prefix | 0   
 customer_city            | 0   
 customer_state           | 0   



## Function to Find Null's

In [26]:
def missing(df,table_name):
    print(f'Missing Fields From : {table_name}')
    df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show(vertical=True)

In [29]:
missing(seller_df,'Sellers Table')

Missing Fields From : Sellers Table
-RECORD 0---------------------
 seller_id              | 0   
 seller_zip_code_prefix | 0   
 seller_city            | 0   
 seller_state           | 0   



In [28]:
missing(payment_df,'Payments Table')

Missing Fields From : Payments Table


[Stage 48:>                                                         (0 + 1) / 2]

-RECORD 0-------------------
 order_id             | 0   
 payment_sequential   | 0   
 payment_type         | 0   
 payment_installments | 0   
 payment_value        | 0   



                                                                                

In [27]:
missing(order_df,'Orders Table')

Missing Fields From : Orders Table
-RECORD 0-----------------------------
 order_id                      | 0    
 customer_id                   | 0    
 order_status                  | 0    
 order_purchase_timestamp      | 0    
 order_approved_at             | 160  
 order_delivered_carrier_date  | 1783 
 order_delivered_customer_date | 2965 
 order_estimated_delivery_date | 0    



                                                                                

## Handling Missing Values

1. Drop Missing Values : for Non-Critical Columns.

2. Fill Missing Values : for Numerical Columns , Referencable from any other Column Values.

3. Impute Missing Values : for Continouse Data.

In [31]:
order_df.createOrReplaceTempView('orders')

In [36]:
spark.sql('select count(*) from orders where order_approved_at is null').show()

+--------+
|count(1)|
+--------+
|     160|
+--------+



## Droping Null Columns

In [40]:
cleaned_order = order_df.na.drop(subset = ['order_id','customer_id','order_approved_at'])
print('Count Before Clean : ', order_df.count());
print('Count After Clean : ', cleaned_order.count());
print('Count Difference : ', (order_df.count())-(cleaned_order.count()))

Count Before Clean :  99441
Count After Clean :  99281
Count Difference :  160


## Filling Null Columns 

In [48]:
order_df.show(10)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [49]:
cleaned_order = order_df.fillna({'order_delivered_carrier_date':'2017-01-01' , 'order_delivered_customer_date':'2017-01-01'})
cleaned_order.show(10)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

## Impute Missing Values

In [50]:
payment_df.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [15]:
# there is no null values in payment : so values below 100 we are going to make it null

payment_null = payment_df.withColumn('payment_value',when(col('payment_value')>100,col('payment_value')).otherwise(lit(None)))
payment_null.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         null|
|a9810da82917af2d9...|                 1| credit_card|                   1|         null|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|         null|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [16]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'],outputCols=['payment_values_imputed']).setStrategy('mean')

cleaned_payment = imputer.fit(payment_null).transform(payment_null)

                                                                                

In [17]:
cleaned_payment.show(5)

+--------------------+------------------+------------+--------------------+-------------+----------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_values_imputed|
+--------------------+------------------+------------+--------------------+-------------+----------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         null|    251.89388114183782|
|a9810da82917af2d9...|                 1| credit_card|                   1|         null|    251.89388114183782|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|         null|    251.89388114183782|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|                107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|                128.45|
+--------------------+------------------+------------+--------------------+-------------+-------

In Apache Spark (PySpark), the Imputer is used to fill in missing values (null/NaN) in numeric columns. It works similarly to scikit-learn’s imputer, allowing strategies like mean, median, or mode

In [21]:
imputer = Imputer(inputCols=['payment_value'],outputCols=['payment_values_imputed']).setStrategy('median')

cleaned_payment = imputer.fit(payment_null).transform(payment_null)
cleaned_payment.show()

+--------------------+------------------+------------+--------------------+-------------+----------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_values_imputed|
+--------------------+------------------+------------+--------------------+-------------+----------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         null|                172.01|
|a9810da82917af2d9...|                 1| credit_card|                   1|         null|                172.01|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|         null|                172.01|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|                107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|                128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|         null|       

In [7]:
print_schema(customer_df,'Customers')

Schema of : Customers
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [8]:
print_schema(payment_df,'Payments')

Schema of : Payments
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [14]:
payment_df.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [17]:
payment_df.groupBy('payment_type').count().orderBy('count',ascending=False).show()



+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



                                                                                

In [8]:
from pyspark.sql.functions import when,length,col

std_payment = payment_df.withColumn('payment_type',when(col('payment_type')=='credit_card','Credit Card') \
                                   .when(col('payment_type')=='boleto','Boleto') \
                                   .when(col('payment_type')=='debit_card','Debit Card') \
                                    .otherwise('Other'))

std_payment.select('payment_type').distinct().orderBy(length('payment_type')).show()

+------------+
|payment_type|
+------------+
|       Other|
|      Boleto|
|  Debit Card|
| Credit Card|
+------------+



                                                                                

In [35]:
std_payment = std_payment.withColumn('Amount', col('payment_value').cast('int'))
std_payment.show(5)

+--------------------+------------------+------------+--------------------+-------------+------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|Amount|
+--------------------+------------------+------------+--------------------+-------------+------+
|b81ef226f3fe1789b...|                 1| Credit Card|                   8|        99.33|    99|
|a9810da82917af2d9...|                 1| Credit Card|                   1|        24.39|    24|
|25e8ea4e93396b6fa...|                 1| Credit Card|                   1|        65.71|    65|
|ba78997921bbcdc13...|                 1| Credit Card|                   8|       107.78|   107|
|42fdf880ba16b47b5...|                 1| Credit Card|                   2|       128.45|   128|
+--------------------+------------------+------------+--------------------+-------------+------+
only showing top 5 rows



In [9]:
print_schema(order_df,'Orders')

Schema of : Orders
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [10]:
order_df.show(5)

                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [19]:
from pyspark.sql.functions import to_date,col

cleaned_order = order_df.withColumn('order_approved_at',to_date(col('order_approved_at')))
cleaned_order.show(5)

+--------------------+--------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|       2017-10-02|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|       2018-07-26|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|   delivered

## DeDuplication

In [36]:
# if Primary key column on main columns itself duplicate , then Drop Duplicates

customer_cleaned = customer_df.dropDuplicates(['customer_id']) 

## join

In [23]:
order_join = order_df.join(customer_df,'customer_id','left') \
                     .join(payment_df,'order_id','left')\
                     .join(item_df,'order_id','left')

In [24]:
order_join.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value']

In [26]:
order_join.select('customer_id','price','payment_type').show(5)

[Stage 48:>                                                         (0 + 1) / 1]

+--------------------+-----+------------+
|         customer_id|price|payment_type|
+--------------------+-----+------------+
|9ef432eb625129730...|29.99|     voucher|
|9ef432eb625129730...|29.99|     voucher|
|9ef432eb625129730...|29.99| credit_card|
|b0830fb4747a6c6d2...|118.7|      boleto|
|41ce2a54c0b03bf34...|159.9| credit_card|
+--------------------+-----+------------+
only showing top 5 rows



                                                                                

In [30]:
# top 5 Customers
from pyspark.sql.functions import sum, round

order_join.groupBy('customer_id').agg(round(sum('price')).alias('total_price')).orderBy('total_price',ascending=False).show(5)



+--------------------+-----------+
|         customer_id|total_price|
+--------------------+-----------+
|1617b1357756262bf...|    13440.0|
|9af2372a1e4934027...|    11384.0|
|de832e8dbb1f588a4...|    10856.0|
|63b964e79dee32a35...|     9888.0|
|6f241d5bbb142b6f7...|     9520.0|
+--------------------+-----------+
only showing top 5 rows



                                                                                

In [None]:
from pyspark.sql.functions import *

gp_df = order_join.groupBy('order_id').agg(sum('price')).show()

[Stage 44:>                                                         (0 + 1) / 1]

+--------------------+-----------------+
|            order_id|       sum(price)|
+--------------------+-----------------+
|f373335aac9a659de...|             35.9|
|118045506e1c1dda0...|            225.0|
|cc66dee6fbc18bb79...|            117.7|
|f44cb69655f8e4d13...|           311.94|
|edcc6b79e8394346b...|             99.9|
|9f98d6530155e3b38...|            299.9|
|5e57ff5e1c008db89...|           139.98|
|0957ed870116e596b...|           129.99|
|3fa59277573f0fe06...|             79.0|
|d5f812041d8fc446c...|             64.9|
|24012690fe6562f4a...|           129.99|
|85be7c94bcd3f908f...|            59.99|
|56ef80c564f6fd57c...|             34.9|
|7a70b827ebc6ab85b...|535.6999999999999|
|8e10a1d1a57b6a469...|           239.99|
|107478e48c13dc0b3...|             71.3|
|949280c70c6d62ec9...|             34.9|
|6a276c227b7bb9659...|            179.9|
|0fe9c7ad9288ff24b...|             34.9|
|03ebfa9712b7dbc70...|             46.9|
+--------------------+-----------------+
only showing top

                                                                                

## Calculating Top Sellers Based on Revenue Generated

In [10]:
rev_df = order_df.join(payment_df,'order_id','left')\
                .join(item_df,'order_id','left') \
                .join(seller_df,'seller_id','left')

In [13]:
rev_df.select(
    item_df['seller_id'], 
    item_df['order_id'],  
    payment_df['payment_value']
).show(3)

                                                                                

+--------------------+--------------------+-------------+
|           seller_id|            order_id|payment_value|
+--------------------+--------------------+-------------+
|3504c0cb71d7fa48d...|e481f51cbdc54678b...|        18.59|
|3504c0cb71d7fa48d...|e481f51cbdc54678b...|          2.0|
|3504c0cb71d7fa48d...|e481f51cbdc54678b...|        18.12|
+--------------------+--------------------+-------------+
only showing top 3 rows



25/06/12 16:07:28 ERROR TransportClient: Failed to send RPC RPC 8392487895230431987 to /10.128.0.7:52148: io.netty.channel.StacklessClosedChannelException
io.netty.channel.StacklessClosedChannelException: null
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source) ~[netty-transport-4.1.77.Final.jar:4.1.77.Final]
25/06/12 16:07:28 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 55 from block manager BlockManagerId(2, my-cluster-w-1.us-central1-a.c.keen-truth-461516-a0.internal, 45651, None)
java.io.IOException: Failed to send RPC RPC 8392487895230431987 to /10.128.0.7:52148: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:392) ~[spark-network-common_2.12-3.3.2.jar:3.3.2]
	at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369) ~[spark-network-common_2.12-3.3.2.jar:

In [14]:
rev_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- selle

In [22]:
from pyspark.sql.functions import *

rev_df.groupBy('seller_id') \
      .agg(round(sum('payment_value')).alias('total_revenue')) \
      .orderBy('total_revenue', ascending=False) \
      .show(5)



+--------------------+-------------+
|           seller_id|total_revenue|
+--------------------+-------------+
|7c67e1448b00f6e96...|     507167.0|
|1025f0e2d44d7041d...|     308222.0|
|4a3ca9315b744ce9f...|     301245.0|
|1f50f920176fa81da...|     290253.0|
|53243585a1d6dc264...|     284903.0|
+--------------------+-------------+
only showing top 5 rows



                                                                                

In [41]:
!hadoop fs -mkdir /data/olist_join

In [42]:
!hadoop fs -ls /data/

Found 2 items
drwxr-xr-x   - dgclub21 hadoop          0 2025-06-07 12:21 /data/olist
drwxr-xr-x   - root     hadoop          0 2025-06-12 16:28 /data/olist_join


In [43]:
rev_df.write.mode('overwrite').parquet('/data/olist_join/join_df.parquet')

                                                                                

In [45]:
!hadoop fs -ls /data/olist_join

Found 1 items
drwxr-xr-x   - root hadoop          0 2025-06-12 16:29 /data/olist_join/join_df.parquet


In [46]:
spark.stop()

# Data Integration

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName('Data_Integeration') \
        .getOrCreate()

25/06/14 08:09:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Caching

In [65]:
order_df.cache()
customer_df.cache()
item_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

## Joining all Tables

In [28]:
order_item_join = order_df.join(item_df,'order_id','inner')

In [29]:
order_item_product = order_item_join.join(product_df,'product_id','inner')

In [30]:
order_item_product_seller = order_item_product.join(seller_df,'seller_id','inner')

In [32]:
full_order = order_item_product_seller.join(customer_df,'customer_id','inner')

In [35]:
# GeoLaction : location is not Important Factor, so inner join skips all details which misses location , for that now doing left join

full_order = full_order.join(
    location_df,
    full_order.customer_zip_code_prefix == location_df.geolocation_zip_code_prefix,
    "left"
)

In [38]:
full_order = full_order.join(review_df,'order_id','left')

In [40]:
full_order = full_order.join(payment_df,'order_id','left')

In [41]:
full_order.columns

['order_id',
 'customer_id',
 'seller_id',
 'product_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'order_item_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'product_category_name',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm',
 'seller_zip_code_prefix',
 'seller_city',
 'seller_state',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'geolocation_zip_code_prefix',
 'geolocation_lat',
 'geolocation_lng',
 'geolocation_city',
 'geolocation_state',
 'review_id',
 'review_score',
 'review_comment_title',
 'review_comment_message',
 'review_creation_date',
 'review_answer_timestamp',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']

In [70]:
full_order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [42]:
full_order.cache()

25/06/14 09:30:33 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

# Optimized Join

A broadcast join is used when one of your DataFrames is small enough to fit in memory. Spark broadcasts the smaller DataFrame to all worker nodes, which avoids shuffling the larger DataFrame across the cluster — making the join much faster.

In [66]:
from pyspark.sql.functions import *

In [67]:
order_item_join = order_df.join(item_df,'order_id','inner')

In [68]:
order_item_product = order_item_join.join(product_df,'product_id','inner')

In [69]:
order_item_product_seller = order_item_product.join(broadcast(seller_df),'seller_id','inner')

In [70]:
full_order = order_item_product_seller.join(customer_df,'customer_id','inner')

In [71]:
# GeoLaction : location is not Important Factor, so inner join skips all details which misses location , for that now doing left join

full_order = full_order.join(
    broadcast(location_df),
    full_order.customer_zip_code_prefix == location_df.geolocation_zip_code_prefix,
    "left"
)

In [72]:
full_order = full_order.join(broadcast(review_df),'order_id','left')

In [73]:
full_order = full_order.join(payment_df,'order_id','left')

In [88]:
full_order = full_order.join(broadcast(tc.select('customer_id','customer_segment')),'customer_id','left')

In [98]:
full_order.cache()

DataFrame[customer_id: string, order_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_comment_message: string, review_creation_date: string, review_answer_timestamp: string, payment_sequential: int, payment_type

## Finding Duplicate Columns and Remove

In [83]:
from collections import Counter

# Get all column names
columns = full_order.columns

# Count occurrences
column_counts = Counter(columns)

# Print duplicates
duplicates = [col for col, count in column_counts.items() if count > 1]
print("Duplicate columns:", duplicates)


Duplicate columns: ['customer_segment']


In [126]:
full_order = full_order.drop('geolocation_state','geolocation_zip_code_prefix','geolocation_city')

In [122]:
full_order.columns

['customer_id',
 'order_id',
 'seller_id',
 'product_id',
 'order_status',
 'order_purchase_timestamp',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'order_item_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'seller_zip_code_prefix',
 'seller_city',
 'seller_state',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'geolocation_zip_code_prefix',
 'geolocation_city',
 'geolocation_state',
 'review_score',
 'payment_type',
 'payment_value',
 'total_spent',
 'total_orders',
 'AOV',
 'day_type',
 'customer_segment']

In [129]:
from pyspark.sql.functions import col, when, count

full_order.select([
    count(when(col(c).isNull(), 1)).alias(c) for c in full_order.columns
]).show(vertical=True)



-RECORD 0----------------------------
 customer_id                   | 0   
 order_id                      | 0   
 seller_id                     | 0   
 product_id                    | 0   
 order_status                  | 0   
 order_purchase_timestamp      | 0   
 order_delivered_customer_date | 0   
 order_estimated_delivery_date | 0   
 order_item_id                 | 0   
 shipping_limit_date           | 0   
 price                         | 0   
 freight_value                 | 0   
 seller_zip_code_prefix        | 0   
 seller_city                   | 0   
 seller_state                  | 0   
 customer_unique_id            | 0   
 customer_zip_code_prefix      | 0   
 customer_city                 | 0   
 customer_state                | 0   
 review_score                  | 0   
 payment_type                  | 0   
 payment_value                 | 0   
 total_spent                   | 0   
 total_orders                  | 0   
 AOV                           | 0   
 day_type   

                                                                                

## Filling Mean Values for Null

In [115]:
mean_value = full_order.select(mean('payment_value')).collect()[0][0]
full_order = full_order.fillna({'payment_value':mean_value})

                                                                                

In [123]:
review_mean = full_order.select(mean('review_score')).collect()[0][0]
full_order = full_order.fillna({'review_score':review_mean})

                                                                                

In [128]:
from pyspark.sql.functions import when, col

full_order = full_order.withColumn(
    'order_delivered_customer_date',
    when(
        col('order_delivered_customer_date').isNull(),
        col('order_estimated_delivery_date')
    ).otherwise(col('order_delivered_customer_date'))
)

In [116]:
full_order = full_order.fillna({'payment_type':'Cash on Delivery'})

# Analysis & Aggregation

## Total Revenue per Seller

In [13]:
from pyspark.sql.functions import sum

full_order.groupBy('seller_id') \
    .agg(sum('price').alias('revenue')) \
    .orderBy('revenue', ascending=False) \
    .show(10)



+--------------------+--------------------+
|           seller_id|             revenue|
+--------------------+--------------------+
|4869f7a5dfa277a7d...|3.6138717319998816E7|
|53243585a1d6dc264...| 3.429159295000016E7|
|4a3ca9315b744ce9f...| 3.375957084003399E7|
|7c67e1448b00f6e96...|3.2282321790014144E7|
|fa1c13f2614d7b5c4...| 3.013938631000357E7|
|da8622b14eb17ae28...|  2.98576697300434E7|
|7e93a43ef30c4f03f...| 2.631570630000493E7|
|1025f0e2d44d7041d...|2.2937518520012498E7|
|46dc3b2cc0980fb8e...| 2.179177329001596E7|
|955fee9216a65b617...|2.0964410670014285E7|
+--------------------+--------------------+
only showing top 10 rows



                                                                                

In [71]:
seller_revenue = full_order.groupBy('seller_id') \
    .agg(round(sum('price')).alias('revenue')) \
    .orderBy('revenue', ascending=False)

In [72]:
seller_revenue.show(10)



+--------------------+-----------+
|           seller_id|    revenue|
+--------------------+-----------+
|4869f7a5dfa277a7d...|3.6138717E7|
|53243585a1d6dc264...|3.4291593E7|
|4a3ca9315b744ce9f...|3.3759571E7|
|7c67e1448b00f6e96...|3.2282322E7|
|fa1c13f2614d7b5c4...|3.0139386E7|
|da8622b14eb17ae28...| 2.985767E7|
|7e93a43ef30c4f03f...|2.6315706E7|
|1025f0e2d44d7041d...|2.2937519E7|
|46dc3b2cc0980fb8e...|2.1791773E7|
|955fee9216a65b617...|2.0964411E7|
+--------------------+-----------+
only showing top 10 rows



                                                                                

## Total Orders per Customers


In [55]:
toc = full_order.groupBy('customer_id') \
        .agg(count('order_id').alias('total_orders')) \
        .orderBy('total_orders',ascending=False)

toc.show(5)



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|351e40989da90e704...|       11427|
|50920f8cd0681fd86...|       10752|
|9b43e2a62de9bab3a...|        8556|
|270c23a11d024a44c...|        8001|
|5c87184371002d49e...|        6876|
+--------------------+------------+
only showing top 5 rows



                                                                                

## Average Review Score Per Seller

In [56]:
arc = full_order.groupBy('seller_id') \
        .agg(avg('review_score').alias('avg_review_score')) \
        .orderBy('avg_review_score',ascending=False)

arc.show(5)



+--------------------+----------------+
|           seller_id|avg_review_score|
+--------------------+----------------+
|9c1c0c36cd23c2089...|             5.0|
|8c351ed7c326c6212...|             5.0|
|ec933281fb017b502...|             5.0|
|f5b84683a9bf9e1df...|             5.0|
|b2eecf5ea250510da...|             5.0|
+--------------------+----------------+
only showing top 5 rows



                                                                                

## Top 10 Sold Products

In [59]:
tsp = full_order.groupBy('product_id') \
                .agg(count('order_id').alias('total_sold')) \
                .orderBy('total_sold',ascending=False) \
                .limit(10)

tsp.show()



+--------------------+----------+
|          product_id|total_sold|
+--------------------+----------+
|aca2eb7d00ea1a7b8...|     86740|
|422879e10f4668299...|     81110|
|99a4788cb24856965...|     78775|
|389d119b48cf3043d...|     60248|
|d1c427060a0f73f6b...|     59274|
|368c6c730842d7801...|     58358|
|53759a2ecddad2bb8...|     52654|
|53b36df67ebb7c415...|     52105|
|154e7e31ebfa09220...|     42700|
|3dd2a17168ec895c7...|     40787|
+--------------------+----------+



                                                                                

## Top 10 Customers by Spending

In [47]:
tcs = full_order.groupBy('customer_id') \
                .agg(round(sum('price')).alias('total_payment')) \
                .orderBy('total_payment',ascending=False) \
                .limit(10)

tcs.show()



+--------------------+-------------+
|         customer_id|total_payment|
+--------------------+-------------+
|d3e82ccec3cb5f956...|    6662844.0|
|df55c14d1476a9a34...|    3565657.0|
|fe5113a38e3575c04...|    3293604.0|
|ec5b2ba62e5743423...|    2556120.0|
|63b964e79dee32a35...|    2501664.0|
|46bb3c0b1a65c8399...|    2336752.0|
|05455dfa7cd02f13d...|    2160194.0|
|3690e975641f01bd0...|    2124498.0|
|349509b216bd5ec11...|    1923627.0|
|695476b5848d64ba0...|    1820543.0|
+--------------------+-------------+



                                                                                

# Window Function & Ranking

In [78]:
# Ranking Top Selling Products Per Customer

from pyspark.sql.window import Window

window_spec = Window.partitionBy('seller_id').orderBy(col('price').desc())

top_seller_products = full_order.withColumn('Rank', rank().over(window_spec)).filter(col('Rank')<=5)
top_seller_products.select('seller_id','price','Rank').show()

[Stage 178:>                                                        (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|Rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

In [87]:
from pyspark.sql.window import Window

tsp = full_order.withColumn('rank', rank().over(Window.partitionBy('seller_id').orderBy(col('price').desc()))) \
                .filter(col('rank')<=5) 

tsp.select('seller_id','price','rank').show(5)

[Stage 228:>                                                        (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 5 rows



                                                                                

# Advanced Aggregation & Enrichment

## Total Revenue & Average Order Value (AOV) per Customer

In [52]:
customer_spend = full_order.groupBy('customer_id')\
.agg(
    count('order_id').alias('total_orders'),
    round(sum('price')).alias('total_spend')
)\
.withColumn('AOV',round(col('total_spend')/col('total_orders')))\
.orderBy(desc('total_spend'))

customer_spend.show()



+--------------------+------------+-----------+------+
|         customer_id|total_orders|total_spend|   AOV|
+--------------------+------------+-----------+------+
|d3e82ccec3cb5f956...|        6876|  6662844.0| 969.0|
|df55c14d1476a9a34...|         743|  3565657.0|4799.0|
|fe5113a38e3575c04...|        2292|  3293604.0|1437.0|
|ec5b2ba62e5743423...|        1428|  2556120.0|1790.0|
|63b964e79dee32a35...|        6072|  2501664.0| 412.0|
|46bb3c0b1a65c8399...|         748|  2336752.0|3124.0|
|05455dfa7cd02f13d...|        2184|  2160194.0| 989.0|
|3690e975641f01bd0...|         802|  2124498.0|2649.0|
|349509b216bd5ec11...|         743|  1923627.0|2589.0|
|695476b5848d64ba0...|         687|  1820543.0|2650.0|
|73236a0796f53d60d...|         832|  1755520.0|2110.0|
|cc803a2c412833101...|         762|  1676400.0|2200.0|
|1ff773612ab8934db...|        5820|  1658642.0| 285.0|
|fced842c7dad61e8c...|         602|  1654898.0|2749.0|
|1ecb47d23dc8203cd...|        1164|  1629588.0|1400.0|
|de832e8db

                                                                                

## Seller Performance Metrics ( Revenue , Average Reivew , Order  Count)

In [27]:
seller_performance = full_order.groupBy('seller_id')\
.agg(
    count('order_id').alias('total_orders') ,
    round(sum('price')).alias('total_revenue') ,
    round(avg('review_score')).alias('review') ,
    round(stddev('price'),2).alias('price_variability')
)\
.orderBy('total_revenue','review',ascending=False)

seller_performance.show()



+--------------------+------------+-------------+------+-----------------+
|           seller_id|total_orders|total_revenue|review|price_variability|
+--------------------+------------+-------------+------+-----------------+
|4869f7a5dfa277a7d...|      184587|  3.6138717E7|   4.0|           111.65|
|53243585a1d6dc264...|       54514|  3.4291593E7|   4.0|           499.65|
|4a3ca9315b744ce9f...|      330661|  3.3759571E7|   4.0|            59.37|
|7c67e1448b00f6e96...|      233306|  3.2282322E7|   3.0|            50.39|
|fa1c13f2614d7b5c4...|       87686|  3.0139386E7|   4.0|            307.7|
|da8622b14eb17ae28...|      264433|   2.985767E7|   4.0|            72.92|
|7e93a43ef30c4f03f...|       50226|  2.6315706E7|   4.0|           377.24|
|1025f0e2d44d7041d...|      229587|  2.2937519E7|   4.0|             84.3|
|46dc3b2cc0980fb8e...|       90426|  2.1791773E7|   4.0|           187.49|
|955fee9216a65b617...|      232364|  2.0964411E7|   4.0|            84.94|
|7a67c85e85bb2ce85...|   

                                                                                

## Product Popularity Metrics

In [34]:
from pyspark.sql.functions import *

ppm = full_order.groupBy('product_id')\
.agg(
    count('order_id').alias('total_Sales') ,
    round(sum('price')).alias('total_revenue') ,
    round(avg('price')).alias('average_price') , 
    round(stddev('price')).alias('price_variable') ,
    collect_set('seller_id').alias('unique_sellers')
)\
.orderBy(desc('total_sales'))
    
ppm.show()



+--------------------+-----------+-------------+-------------+--------------+--------------------+
|          product_id|total_Sales|total_revenue|average_price|price_variable|      unique_sellers|
+--------------------+-----------+-------------+-------------+--------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740|    6164630.0|         71.0|           3.0|[955fee9216a65b61...|
|422879e10f4668299...|      81110|    4442792.0|         55.0|           4.0|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775|    6921763.0|         88.0|           4.0|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|    3280533.0|         54.0|           4.0|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274|    8220103.0|        139.0|          17.0|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358|    3181699.0|         55.0|           5.0|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654|    2893017.0|         55.0|           5.0|[1f50f920176fa81d...|
|53b36df67

                                                                                

## Customer Retention Analysis ( First & Last Order )

In [38]:
customer_retention = full_order.groupBy('customer_id')\
.agg(
    first('order_purchase_timestamp').alias('first_order') , 
    last('order_purchase_timestamp').alias('last_order') ,
    count('order_id').alias('total_orders')
)\
.withColumn('date_diff',datediff('first_order','last_order')) \
.orderBy(desc('date_diff'))

In [None]:
customer_retention.show()



+--------------------+-------------------+-------------------+------------+---------+
|         customer_id|        first_order|         last_order|total_orders|date_diff|
+--------------------+-------------------+-------------------+------------+---------+
|74ad5dd7aac0a613b...|2017-03-28 14:32:09|2017-03-28 14:32:09|         101|        0|
|6b0741b9cab6fa1a2...|2018-05-09 23:13:39|2018-05-09 23:13:39|         144|        0|
|d713d3c4ca576a7fe...|2018-01-15 12:58:22|2018-01-15 12:58:22|         193|        0|
|8915549cc658e933b...|2018-02-16 08:56:37|2018-02-16 08:56:37|          12|        0|
|dc76dea012fc4ca16...|2018-06-11 10:11:47|2018-06-11 10:11:47|         169|        0|
|15d1752086d06a721...|2018-05-05 13:36:10|2018-05-05 13:36:10|          32|        0|
|6c4b7dd5c99515fe3...|2017-03-13 16:49:06|2017-03-13 16:49:06|         156|        0|
|6e303063ecd7e907c...|2018-04-19 13:51:50|2018-04-19 13:51:50|         553|        0|
|cd07153a3e03b42c9...|2018-03-05 14:47:54|2018-03-05 1

                                                                                

## Delivery Time Difference

In [77]:
delivery_time = full_order.groupBy('customer_id')\
.agg(
    min('order_purchase_timestamp').alias('orderd_date') , 
    max('order_delivered_customer_date').alias('deliverd_date') ,
    count('order_id').alias('total_orders')
)\
.withColumn('date_diff',datediff('deliverd_date','orderd_date')) \
.orderBy(desc('date_diff'))

In [46]:
delivery_time.show()



+--------------------+-------------------+-------------------+------------+---------+
|         customer_id|        orderd_date|      deliverd_date|total_orders|date_diff|
+--------------------+-------------------+-------------------+------------+---------+
|75683a92331068e2d...|2017-02-21 23:31:27|2017-09-19 14:36:39|          68|      210|
|d306426abe5fca15e...|2018-02-23 14:57:35|2018-09-19 23:24:07|         159|      208|
|7815125148cfa1e8c...|2017-03-07 23:59:51|2017-09-19 15:12:50|          45|      196|
|217906bc11a32c1e4...|2017-03-08 18:09:02|2017-09-19 14:33:17|          59|      195|
|9cf2c3fa2632cee74...|2017-03-08 22:47:40|2017-09-19 14:00:04|         112|      195|
|1a8a4a30dc2969767...|2017-03-09 13:26:57|2017-09-19 14:38:21|           2|      194|
|cb2caaaead400c973...|2018-01-03 09:44:01|2018-07-13 20:51:31|         144|      191|
|65b14237885b3972e...|2017-03-13 20:17:10|2017-09-19 17:00:07|         155|      190|
|8199345f57c6d1cbe...|2017-03-15 11:24:27|2017-09-19 1

                                                                                

In [17]:
full_order.select('order_status').groupBy('order_status').count().show()



+------------+--------+
|order_status|   count|
+------------+--------+
|     shipped|  165508|
|    canceled|   81273|
|    invoiced|   64213|
|   delivered|17692295|
|  processing|   59073|
|    approved|     658|
| unavailable|    1241|
+------------+--------+



                                                                                

## SQL

In [25]:
full_order.createTempView('dataset')

In [26]:
spark.sql('select order_status from dataset').show(4)

+------------+
|order_status|
+------------+
|   delivered|
|   delivered|
|   delivered|
|   delivered|
+------------+
only showing top 4 rows



In [38]:
spark.sql('''select order_status,
                    count(order_status) as total_count
                    from dataset
                    group by order_status
                    ''').show()

                                                                                

+------------+-----------+
|order_status|total_count|
+------------+-----------+
|     shipped|     165508|
|    canceled|      81273|
|    invoiced|      64213|
|   delivered|   17692295|
|  processing|      59073|
|    approved|        658|
| unavailable|       1241|
+------------+-----------+



## Order Status Flags

In [39]:
full_order = full_order.withColumn('is_delivered',when(col('order_status')=='delivered',lit(1)).otherwise(lit(0)))

In [43]:
full_order.select('order_status','is_delivered').groupBy('is_delivered').agg(count('is_delivered')).show(5)



+------------+-------------------+
|is_delivered|count(is_delivered)|
+------------+-------------------+
|           1|           17692295|
|           0|             371966|
+------------+-------------------+



                                                                                

In [55]:
full_order = full_order.withColumn('order_revenue',col('price')+col('freight_value'))
full_order.select('price','freight_value','order_revenue').show()

+-----+-------------+-------------+
|price|freight_value|order_revenue|
+-----+-------------+-------------+
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
+-----+-------------+-------------+
only showing top 20 rows



## Customer Segmentation based on Spending

In [53]:
# top customers 

tc = full_order.groupBy('customer_id')\
.agg(
    round(sum('price')).alias('total_spent') ,
    count('order_id').alias('total_orders')
)\
.withColumn('AOV',round(col('total_spent')/col('total_orders')))

tc.show()



+--------------------+-----------+------------+-----+
|         customer_id|total_spent|total_orders|  AOV|
+--------------------+-----------+------------+-----+
|41ce2a54c0b03bf34...|     3998.0|          25|160.0|
|f54a9f0e6b351c431...|       60.0|           3| 20.0|
|2a1dfb647f32f4390...|    34710.0|          78|445.0|
|4f28355e5c17a4a42...|    32387.0|         233|139.0|
|4632eb5a8f175f6fe...|    21413.0|         268| 80.0|
|57ee2ef64f17a5f9a...|    14286.0|         333| 43.0|
|cc3590e4afbb4b3e0...|     3400.0|         272| 13.0|
|843ff05b30ce4f75b...|     5518.0|          65| 85.0|
|a4156bb8aff5d6722...|     2624.0|          15|175.0|
|084dab2db2bf5d426...|     4751.0|          72| 66.0|
|aa012a8928021b41e...|    10485.0|         150| 70.0|
|a2ae1a05b2058776a...|     5944.0|         119| 50.0|
|7089a41f6a02e9b57...|     1036.0|          28| 37.0|
|1099d033c74a027a7...|     8683.0|         174| 50.0|
|7a3bd3b37285f0ab2...|    50688.0|         512| 99.0|
|c499f24d5aca03c90...|     1

                                                                                

In [54]:
from pyspark.sql.functions import col, when

tc = tc.withColumn(
    'customer_segment',
    when(col('AOV') >= 1200, 'High_Value')
    .when(col('AOV').between(500, 1200), 'Mid_Value')  # Note: upper bound is exclusive
    .otherwise('Low_Value')
)
tc.select(min('AOV'),max('AOV'),avg('AOV')).show()



+--------+--------+------------------+
|min(AOV)|max(AOV)|          avg(AOV)|
+--------+--------+------------------+
|     1.0|  6735.0|125.96647274643747|
+--------+--------+------------------+



                                                                                

In [79]:
tc.groupBy('customer_segment').count().show()




+----------------+-----+
|customer_segment|count|
+----------------+-----+
|      High_Value|  596|
|       Low_Value|95460|
|       Mid_Value| 2610|
+----------------+-----+



                                                                                

### Joining Specific Columns

In [56]:
full_order = full_order.join(tc.select('customer_id','customer_segment'),'customer_id','left')

In [57]:
full_order.select('customer_id','customer_segment').show()

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
|9ef432eb625129730...|       Low_Value|
+--------------------+----------------+
only showing top 20 rows



## Date wise Orders Distribution

In [61]:
from pyspark.sql.functions import *

full_order = full_order.groupBy(to_date('order_purchase_timestamp').alias('order_date')).agg(count('order_id').alias('total_orders')).orderBy('order_date')
full_order.show(3)

AttributeError: 'NoneType' object has no attribute 'groupBy'

## Hourly orders distribution

In [28]:
full_order.groupBy(hour('order_purchase_timestamp').alias('hour')).agg(count('order_id')).orderBy('hour').show()



+----+---------------+
|hour|count(order_id)|
+----+---------------+
|   0|         423640|
|   1|         209998|
|   2|          90355|
|   3|          47496|
|   4|          40668|
|   5|          28626|
|   6|          84016|
|   7|         213844|
|   8|         528500|
|   9|         873769|
|  10|        1134498|
|  11|        1202759|
|  12|        1102917|
|  13|        1152244|
|  14|        1218318|
|  15|        1159603|
|  16|        1261538|
|  17|        1152472|
|  18|        1051446|
|  19|        1109130|
+----+---------------+
only showing top 20 rows



                                                                                

## Weekdays Vs. Weekends

In [79]:
full_order = full_order.withColumn(
    'day_type',
    when(dayofweek('order_purchase_timestamp').isin(1, 7), lit('Weekend'))
    .otherwise(lit('Weekday'))
)

In [34]:
full_order.groupBy('day_type').agg(count('order_id')).show()



+--------+---------------+
|day_type|count(order_id)|
+--------+---------------+
| Weekday|       13965495|
| Weekend|        4098766|
+--------+---------------+



                                                                                

In [80]:
full_order.select('order_purchase_timestamp','day_type','order_id').show()

                                                                                

+------------------------+--------+--------------------+
|order_purchase_timestamp|day_type|            order_id|
+------------------------+--------+--------------------+
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Weekday|e481f51cbdc54678b...|
|     2017-10-02 10:56:33| Week

                                                                                

## Location wise Sales

In [None]:
full_order.groupBy('geolocation_state').agg(count('order_id')).show()



+-----------------+---------------+
|geolocation_state|count(order_id)|
+-----------------+---------------+
|               SC|         644944|
|               RO|          24526|
|               PI|          27693|
|               AM|           6488|
|               RR|           2411|
|               GO|         162415|
|             null|            317|
|               TO|          22360|
|               MT|         155225|
|               SP|        6742207|
|               ES|         367211|
|               PB|          33379|
|               RS|         971705|
|               MS|          73679|
|               AL|          37741|
|               MG|        3433229|
|               PA|          96276|
|               BA|         443980|
|               SE|          28145|
|               PE|         132001|
+-----------------+---------------+
only showing top 20 rows



                                                                                

## Spark Configuration & Join Optimization Strategies

In [37]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('performance_optimization') \
    .config('spark.executor.memory', '6g') \
    .config('spark.executor.cores', '4') \
    .config('spark.executor.instances', '2') \
    .config('spark.driver.memory', '4g') \
    .config('spark.driver.maxResultSize', '2g') \
    .config('spark.sql.shuffle.partitions', '50') \
    .config('spark.default.parallelism', '50') \
    .config('spark.sql.adaptive.enabled', 'true') \
    .config('spark.sql.adaptive.coalescePartitions.enabled', 'true') \
    .config('spark.sql.autoBroadcastJoinThreshold', 20 * 1024 * 1024) \      # Broadcast Size has been initialized as Required
    .config('spark.sql.files.maxPartitionBytes', '64MB') \
    .config('spark.sql.files.openCostInBytes', '2MB') \
    .config('spark.memory.fraction', '0.8') \
    .config('spark.memory.storageFraction', '0.2') \
    .getOrCreate()

spark

25/06/15 13:33:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Broadcast : Good for Small Datasets

In [None]:
customer_broadcast = broadcast(customer_df)
optimized_broadccast = full_order.join(customer_broadcast.'customer_id')

## Sort & Merge Join : Good for Large Datasets

In [None]:
sorted_customer = cutomer_df.sortWithinPartitions('customer_id')
sorted_orders = full_order.sortWithinPartitions('customer_id')

full_merge = sorted_orders.join(sorted_customer, 'customer_id')

## Bucket Join

In [None]:
bucket_cust = customer_df.repartition(10,'cutomer_id')
bucket_order = full_order.repartition(10,'customer_id')

bucket_join = bucket_order.join(bucket_cust,'customer_id')

## Skew Join Handling 


In [None]:
skew_join = full_order.join(customer_df.hint('skew'),'customer_id')

# Saving Files

In [47]:
# saving as parquet file in Data Proc

full_order.write.mode('overwrite').parquet('/data/olist/full')

25/06/15 15:15:14 WARN TaskSetManager: Lost task 1.0 in stage 60.0 (TID 72) (my-cluster-w-1.us-central1-a.c.keen-truth-461516-a0.internal executor 2): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:654)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:379)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)


In [43]:
!hadoop fs -ls /data/olist/full/

Found 3 items
-rw-r--r--   2 root hadoop          0 2025-06-15 13:43 /data/olist/full/_SUCCESS
-rw-r--r--   2 root hadoop  180868162 2025-06-15 13:43 /data/olist/full/part-00000-81316d82-c3c5-4db4-b96c-cb2b5e87ed2f-c000.snappy.parquet
-rw-r--r--   2 root hadoop  109090406 2025-06-15 13:42 /data/olist/full/part-00001-81316d82-c3c5-4db4-b96c-cb2b5e87ed2f-c000.snappy.parquet


# Data Serving

In [44]:
df = spark.read.parquet('/data/olist/full/')

In [45]:
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

## Serving Data to Google Bucket

In [130]:
full_order.coalesce(1).write.mode('overwrite').parquet('gs://dataproc-staging-us-central1-912697423871-bztajijq/files/')

                                                                                

In [109]:
full_order.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation