Setting Up Spark 

1. Deploy a spark Cluster(Dataproc, EMR, etc)
2. Store Data in HDFS and not local
    Download the data
    Source: Kaggle
    Link: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce/data?select=olist_customers_dataset.csv
        *issue: unable to do SSH download
        *solution: download the file through local terminal, then unzip. uploaded the file through cluster web interface
    Create directory on Hadoop Cluster - /data
    Distributed data into /data

In [1]:
from pyspark.sql import *

In [2]:
spark = SparkSession.builder.appName('OlistData').getOrCreate()

25/12/23 07:04:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
path = '/data/'

In [4]:
spark

In [5]:
!hdfs dfs -ls -h /data

Found 9 items
-rw-r--r--   2 iantrisdc hadoop      8.6 M 2025-12-23 06:13 /data/olist_customers_dataset.csv
-rw-r--r--   2 iantrisdc hadoop     58.4 M 2025-12-23 06:13 /data/olist_geolocation_dataset.csv
-rw-r--r--   2 iantrisdc hadoop     14.7 M 2025-12-23 06:13 /data/olist_order_items_dataset.csv
-rw-r--r--   2 iantrisdc hadoop      5.5 M 2025-12-23 06:13 /data/olist_order_payments_dataset.csv
-rw-r--r--   2 iantrisdc hadoop     13.8 M 2025-12-23 06:13 /data/olist_order_reviews_dataset.csv
-rw-r--r--   2 iantrisdc hadoop     16.8 M 2025-12-23 06:13 /data/olist_orders_dataset.csv
-rw-r--r--   2 iantrisdc hadoop      2.3 M 2025-12-23 06:13 /data/olist_products_dataset.csv
-rw-r--r--   2 iantrisdc hadoop    170.6 K 2025-12-23 06:13 /data/olist_sellers_dataset.csv
-rw-r--r--   2 iantrisdc hadoop      2.6 K 2025-12-23 06:13 /data/product_category_name_translation.csv


In [6]:
customer = spark.read.csv(path+'olist_customers_dataset.csv',header=True,inferSchema = True)

                                                                                

In [8]:
customer.show(10)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

**Reading dataset into Spark

In [11]:
geolocation = spark.read.csv(path+'olist_geolocation_dataset.csv',header=True,inferSchema = True)
order_items = spark.read.csv(path+'olist_order_items_dataset.csv',header=True,inferSchema = True)
payments = spark.read.csv(path+'olist_order_payments_dataset.csv',header=True,inferSchema = True)
reviews = spark.read.csv(path+'olist_order_reviews_dataset.csv',header=True,inferSchema = True)
orders = spark.read.csv(path+'olist_orders_dataset.csv',header=True,inferSchema = True)
products = spark.read.csv(path+'olist_products_dataset.csv',header=True,inferSchema = True)
seller = spark.read.csv(path+'olist_sellers_dataset.csv',header=True,inferSchema = True)

                                                                                

In [12]:
seller.show(5)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [13]:
customer.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



DataLeak Check

customer dataframe row does match with the Kaggle unique value

In [25]:
customer.count()

99441

In [27]:
seller.count() ##

3095

In [14]:
df_list = (geolocation,order_items,payments,reviews,orders,products,seller)

**Null Values Chek**

In [28]:
from pyspark.sql.functions import *

In [43]:
customer.select([col(c).isNull().alias(c) for c in customer.columns]).show()


+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|           

In [44]:

customer.select(
    [
        count(when(col(c).isNull(), 1)).alias(c)
        for c in customer.columns
    ]
).show() # Found no Null Values

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [45]:
geolocation.select(
    [
        count(when(col(c).isNull(), 1)).alias(c)
        for c in geolocation.columns
    ]
).show() # Found no Null Values



+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+



                                                                                

**Duplicate Values Check**

In [55]:
customer.groupBy('customer_id').count().filter('count>1').show()


#No Duplicate Found

                                                                                

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



**Customer Distribution by State

In [61]:

customer.groupBy('customer_state') \
    .count() \
    .orderBy(col('count').desc()) \
    .show()

[Stage 192:>                                                        (0 + 1) / 1]

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



                                                                                

**Order Status Distribution

In [66]:
orders.groupBy('order_status').count().orderBy('count',ascending = False).show()



+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



                                                                                

Payment Distribution

In [70]:
payments.show(2)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 2 rows



In [86]:
payments.groupBy('payment_type').count().orderBy('count',ascending = False).show()

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



**Top Selling Products

In [107]:

order_items.groupBy('product_id') \
    .agg(
        sum('price').alias('total_revenue'),
        count('*').alias('order_count')
    ) \
    .orderBy('total_revenue', ascending=False) \
    .show()



+--------------------+------------------+-----------+
|          product_id|     total_revenue|order_count|
+--------------------+------------------+-----------+
|bb50f2e236e5eea01...|           63885.0|        195|
|6cdd53843498f9289...| 54730.20000000005|        156|
|d6160fb7873f18409...|48899.340000000004|         35|
|d1c427060a0f73f6b...| 47214.51000000006|        343|
|99a4788cb24856965...|43025.560000000085|        488|
|3dd2a17168ec895c7...| 41082.60000000005|        274|
|25c38557cf793876c...| 38907.32000000001|         38|
|5f504b3a1c75b73d6...|37733.899999999994|         63|
|53b36df67ebb7c415...| 37683.42000000001|        323|
|aca2eb7d00ea1a7b8...| 37608.90000000007|        527|
|e0d64dcfaa3b6db5c...|          31786.82|        194|
|d285360f29ac7fd97...|31623.809999999983|        123|
|7a10781637204d8d1...|           30467.5|        143|
|f1c7f353075ce59d8...|          29997.36|        154|
|f819f0c84a64f02d3...|29024.479999999996|         45|
|588531f8ec37e7d5f...|28291.

                                                                                

**Order Delivery Time Analysis

In [119]:
order_date = orders.select("order_id", "order_purchase_timestamp","order_delivered_customer_date")

In [134]:
order_date = order_date.withColumn(
    'delivery_time',
    datediff(
        to_timestamp(col('order_delivered_customer_date')),
        to_timestamp(col('order_purchase_timestamp'))
    )
)

# Order descending by delivery_time
order_date.orderBy('delivery_time', ascending=False).show()

+--------------------+------------------------+-----------------------------+-------------+----------------------+---------------------+-------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_time|delivery_time_in_hours|delivery_time_in_days|delivery_time(days)|
+--------------------+------------------------+-----------------------------+-------------+----------------------+---------------------+-------------------+
|ca07593549f1816d2...|     2017-02-21 23:31:27|          2017-09-19 14:36:39|          210|                   210|                  210|                210|
|1b3190b2dfa9d789e...|     2018-02-23 14:57:35|          2018-09-19 23:24:07|          208|                   208|                  208|                208|
|440d0d17af552815d...|     2017-03-07 23:59:51|          2017-09-19 15:12:50|          196|                   196|                  196|                196|
|2fb597c2f772eca01...|     2017-03-08 18:09:02|          2

**Data Cleaning and Transformation

In [150]:
def null_values(df,df_name):
    print(f'Looking at {df_name}')
    df.select([count(when(col(c).isNull(), 1)).alias(c)for c in df.columns]).show()

In [151]:
null_values(customer,'customer')

Looking at customer
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [152]:
null_values(geolocation,'geolocation')

Looking at geolocation




+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+



                                                                                

In [153]:
null_values(order_items,'order_items')

Looking at order_items




+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



                                                                                

In [154]:
null_values(payments,'payments')

Looking at payments
+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



In [155]:
null_values(reviews,'reviews')

Looking at reviews




+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|review_id|order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+
|        1|    2236|        2380|               92157|                 63079|                8764|                   8785|
+---------+--------+------------+--------------------+----------------------+--------------------+-----------------------+



                                                                                

In [156]:
null_values(orders,'orders')

Looking at orders




+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



                                                                                

In [157]:
null_values(products,'products')

Looking at products
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                  610|                610|                       610|               610|               2|                2|                2|               2|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



In [158]:
null_values(seller,'seller')

Looking at seller
+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
|        0|                     0|          0|           0|
+---------+----------------------+-----------+------------+



**Handle Missing Values

1. Drop missing Values (for Non-Critical Columns)
2. Fill Missing Values (for numerical columns)
3. Impute missing Values (for continous data)

In [159]:
orders_cleaned = orders.na.drop(subset=['order_id','customer_id','order_status'])

2965