## Step 1: Setting up the Spark Environment**

In a real world company setup, we wouldn't use Google colab directly. Instead we would:_

**1) Deploy a Spark Cluster(** _Data proc, Amazon EMR, Azure HDInsight,On-prem)_ **2) Store data in HDFS and not local. -Load data from Kaggle i.e Data Source (**#!/bin/bash

!curl -L -o ~/Downloads/brazilian-ecommerce.zip\\

  https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce

**3) Use Pyspark to interact with Data.**

In [9]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
.appName("OlistData")\
.getOrCreate()

25/08/31 13:43:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [10]:
!hadoop fs -ls /data/olist/

Found 9 items
-rw-r--r--   2 donnirmal86 hadoop    9033957 2025-08-31 11:48 /data/olist/olist_customers_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop   61273883 2025-08-31 11:50 /data/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop   15438671 2025-08-31 11:50 /data/olist/olist_order_items_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop    5777138 2025-08-31 11:50 /data/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop   14451670 2025-08-31 11:50 /data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop   17654914 2025-08-31 11:50 /data/olist/olist_orders_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop    2379446 2025-08-31 11:50 /data/olist/olist_products_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop     174703 2025-08-31 11:50 /data/olist/olist_sellers_dataset.csv
-rw-r--r--   2 donnirmal86 hadoop       2613 2025-08-31 11:50 /data/olist/product_category_name_translation.csv


In [11]:
hdfs_path = '/data/olist/'

In [12]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)

                                                                                

In [13]:
customers_df.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows



                                                                                

In [15]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True,inferSchema=True)
orders_item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True,inferSchema=True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True,inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True,inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True,inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True,inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [16]:
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [17]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [21]:
## Checking if there is any Data Leakage or Drop 

print(f'Customers :{customers_df.count()} rows')
print(f'Orders :{orders_df.count()} rows')
print(f'orders_item :{orders_item_df.count()} rows')

Customers :99441 rows
Orders :99441 rows
orders_item :112650 rows


In [23]:
customers_df.columns

['customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state']

In [25]:
## Checking NULL or say any DUPLICATE VALUES 

from pyspark.sql.functions import col 

## For each column,count how many null it has 

customers_df.select([col(c).isNull().alias(c) for c in customers_df.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|           

In [27]:
## Check for nulls in critical fields. 
from pyspark.sql.functions import col,when,count
customers_df.select([count(when(col(c).isNull(),1)).alias(c) for c in customers_df.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [28]:
## Check for Duplicate values. 

customers_df.groupBy('customer_id').count().filter('count>1').show()

                                                                                

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



In [31]:
## Customer Distribution by State 

customers_df.groupBy('customer_state').count().orderBy('count',ascending =False).show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



In [33]:
orders_df.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

In [36]:
## Order - Order Status Distribution. 

orders_df.groupBy('order_status').count().orderBy('count',ascending=False).show()


+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



In [37]:
## Payments 

payments_df.columns

['order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value']

In [38]:
payments_df.groupBy('payment_type').count().orderBy('count',ascending=False).show()

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



In [40]:
## Top Selling Products 

products_df.columns

['product_id',
 'product_category_name',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm']

In [44]:
products_df.groupBy('product_category_name').count().orderBy('count',ascending=False).show()

+---------------------+-----+
|product_category_name|count|
+---------------------+-----+
|      cama_mesa_banho| 3029|
|        esporte_lazer| 2867|
|     moveis_decoracao| 2657|
|         beleza_saude| 2444|
| utilidades_domest...| 2335|
|           automotivo| 1900|
| informatica_acess...| 1639|
|           brinquedos| 1411|
|   relogios_presentes| 1329|
|            telefonia| 1134|
|                bebes|  919|
|           perfumaria|  868|
| fashion_bolsas_e_...|  849|
|            papelaria|  849|
|           cool_stuff|  789|
|   ferramentas_jardim|  753|
|             pet_shop|  719|
|                 NULL|  610|
|          eletronicos|  517|
| construcao_ferram...|  400|
+---------------------+-----+
only showing top 20 rows



In [45]:
## Top Selling products - Price Wise 

orders_item_df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [49]:
from pyspark.sql.functions import sum 

top_products = orders_item_df.groupBy('product_id').agg(sum('price').alias('total_sales'))
top_products.orderBy('total_sales',ascending=False).show(20)



+--------------------+------------------+
|          product_id|       total_sales|
+--------------------+------------------+
|bb50f2e236e5eea01...|           63885.0|
|6cdd53843498f9289...| 54730.20000000005|
|d6160fb7873f18409...|48899.340000000004|
|d1c427060a0f73f6b...| 47214.51000000006|
|99a4788cb24856965...|43025.560000000085|
|3dd2a17168ec895c7...| 41082.60000000005|
|25c38557cf793876c...| 38907.32000000001|
|5f504b3a1c75b73d6...|37733.899999999994|
|53b36df67ebb7c415...| 37683.42000000001|
|aca2eb7d00ea1a7b8...| 37608.90000000007|
|e0d64dcfaa3b6db5c...|          31786.82|
|d285360f29ac7fd97...|31623.809999999983|
|7a10781637204d8d1...|           30467.5|
|f1c7f353075ce59d8...|          29997.36|
|f819f0c84a64f02d3...|29024.479999999996|
|588531f8ec37e7d5f...|28291.989999999998|
|422879e10f4668299...|26577.219999999972|
|16c4e87b98a9370a9...|           25034.0|
|5a848e4ab52fd5445...|24229.029999999962|
|a62e25e09e05e6faf...|           24051.0|
+--------------------+------------

                                                                                

In [52]:
## Average Delivery Time Analysis. 

orders_df.columns


['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date']

In [59]:
orders_df.select('order_id','order_purchase_timestamp','order_delivered_customer_date').show()

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|
|a4591c265e18cb1dc...|     2017-07-09 21:57:05|          2017-07-26 10:57:55|
|136cce7faa42fdb2c...|     2017-04-11 12:22:08|                         NULL|
|6514b8ad8028c9f2c...|     2017-05-16 13:10:30|          2017-05-26 12:55:51|
|76c6e866289321a7c...|     2017-01-23 18:29:09|          2017-02-02 14:08:10|
|e69bfb5eb88e0ed6a...|     2017-07-29 11:55:02|          2017-08

In [67]:
delivery_df = orders_df.select('order_id','order_purchase_timestamp','order_delivered_customer_date').show()

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|
|a4591c265e18cb1dc...|     2017-07-09 21:57:05|          2017-07-26 10:57:55|
|136cce7faa42fdb2c...|     2017-04-11 12:22:08|                         NULL|
|6514b8ad8028c9f2c...|     2017-05-16 13:10:30|          2017-05-26 12:55:51|
|76c6e866289321a7c...|     2017-01-23 18:29:09|          2017-02-02 14:08:10|
|e69bfb5eb88e0ed6a...|     2017-07-29 11:55:02|          2017-08

In [68]:
from pyspark.sql.functions import datediff,to_date 

delivery_detail_df = delivery_df.withColumn('delivery_time',datediff(col('order_delivered_customer_date'),col('order_purchased_timestamp')))

AttributeError: 'NoneType' object has no attribute 'withColumn'

In [72]:
delivery_df = spark.read.csv("/data/olist/olist_orders_dataset.csv",
                             header=True, inferSchema=True)


