In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('OlistData') \
.getOrCreate()

25/08/23 06:52:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/user/olist/'

In [3]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv', header = True, inferSchema = True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv', header = True, inferSchema = True)
order_item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv', header = True, inferSchema = True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv', header = True, inferSchema = True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv', header = True, inferSchema = True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv', header = True, inferSchema = True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv', header = True, inferSchema = True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv', header = True, inferSchema = True)
category_transformation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv', header = True, inferSchema = True)

                                                                                

In [4]:
from pyspark.sql.functions import *

In [5]:
# Identifying the missing values

def missing_values(df, df_name):
    print(f'Missing values in {df_name}:')
    df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()

In [6]:
missing_values(customers_df,'customer')

Missing values in customer:


[Stage 18:>                                                         (0 + 2) / 2]

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



                                                                                

In [7]:
missing_values(orders_df,'orders')

Missing values in orders:


[Stage 21:>                                                         (0 + 2) / 2]

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



                                                                                

# Handle Missing Values

1. Drop Missing Values (for non - critical columns)
2. Fill Missing Values (for numerical columns)
3. Impute Missing Values (for continous data)

In [8]:
orders_df_cleaned = orders_df.na.drop(subset=['order_id','customer_id','order_status'])

In [9]:
orders_df_cleaned.show(10)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [10]:
orders_df_cleaned = orders_df.fillna({'order_delivered_customer_date':'9999-12-31'})

In [11]:
orders_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

# Impute missing values

In [12]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'],outputCols = ['payment_value_imputed']).setStrategy('mean')

payments_df_cleaned = imputer.fit(payments_df).transform(payments_df)

                                                                                

In [13]:
payments_df_cleaned.show(10)

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

In [14]:
payments_df_with_null = payments_df.withColumn('payment_value',when(col('payment_value')!=99.33,col('payment_value')).otherwise(lit(None)))

In [15]:
payments_df_with_null.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [16]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ['payment_value'],outputCols = ['payment_value_imputed']).setStrategy('median')

payments_df.cleaned = imputer.fit(payments_df_with_null).transform(payments_df_with_null)

payments_df_cleaned.show()

                                                                                

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

# Standardizing the format

In [17]:
def print_schema(df, df_name):
    print(f'Schema of {df_name}:')
    df.printSchema()

In [18]:
print_schema(orders_df,'orders')

Schema of orders:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [19]:
print_schema(customers_df,'customers')

Schema of customers:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [20]:
print_schema(payments_df,'payments')

Schema of payments:
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [21]:
payments_df_cleaned = payments_df_cleaned.withColumn('payment_type',
                                                    when(col('payment_type')=='boleto','Bank Transfer')
                                                    .when(col('payment_type')=='credit_card','Credit Card'))

In [22]:
payments_df_cleaned.show()

+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential| payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1|  Credit Card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1|  Credit Card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1|  Credit Card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1|  Credit Card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1|  Credit Card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1|  Credit Card|                   2|        96.12|      

In [23]:
orders_df_cleaned = orders_df_cleaned.withColumn('order_purchase_timestamp',to_date(col('order_purchase_timestamp')))

In [24]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|              2017-10-02|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|              2018-07-24|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [25]:
customers_df_cleaned = customers_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast('string'))

In [26]:
print_schema(customers_df_cleaned,'customer_df_schema')

Schema of customer_df_schema:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



# Remove Duplicate Records

In [27]:
customers_df_cleaned = customers_df_cleaned.dropDuplicates(['customer_id'])

In [28]:
orders_with_details = orders_df_cleaned.join(order_item_df,'order_id','left')\
.join(payments_df_cleaned,'order_id','left')\
.join(customers_df_cleaned,'customer_id','left')

orders_with_details.show(10)

[Stage 44:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+-------------+--------------------+-------------+---------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|payment_sequential| payment_type|payment_installments|payment_value|payment_value_imputed|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-

                                                                                

In [29]:
order_with_total_value = orders_with_details.groupBy('order_id')\
.agg(sum('payment_value').alias('total_order_value'))

order_with_total_value.show()



+--------------------+-----------------+
|            order_id|total_order_value|
+--------------------+-----------------+
|1c4a92d82c1b0dec1...|747.1500000000001|
|28eaf054725f4dd3c...|             45.0|
|78cd965d0bc0388d3...|           189.37|
|126f2d9c30f82426d...|           155.01|
|d6d7c431275f0029d...|           161.71|
|502d7a6ef832644f5...|           161.84|
|a268b656281801419...|           297.64|
|61fc33807aeed929a...|            396.4|
|fed4ca34aeb50d89a...|           204.08|
|8d682b67c7d1f4f55...|           119.45|
|43ed2e201f59c4d2f...|           185.94|
|1826d2a2eb6ba6e3e...|           232.83|
|d17a342bb9f94d40c...|           149.52|
|8dd9758206f8d9c23...|           100.34|
|f9b9c3c69ef08b6ed...|            58.64|
|a97a2be3f668f7c6d...|           201.43|
|fdf9e48108cf744c3...|           819.35|
|f19d7e5d1c51bae3d...|            27.15|
|8c8d824536e2e08f1...|           852.49|
|09e90e3936db197d4...|            86.99|
+--------------------+-----------------+
only showing top

                                                                                

In [30]:
# Delivery Time Calculation from prev notebook



# Advance Transformation

In [31]:
quantiles = order_item_df.approxQuantile('price',[0.01,0.99],0.0)
low_cutoff, high_cutoff = quantiles[0],quantiles[1]

                                                                                

In [32]:
low_cutoff, high_cutoff

(9.99, 890.0)

In [33]:
order_items_df_cleaned = order_item_df.filter((col('price') >= low_cutoff) & (col('price') <= high_cutoff))

In [34]:
payments_df_cleaned.select('payment_installments').summary().show()



+-------+--------------------+
|summary|payment_installments|
+-------+--------------------+
|  count|              103886|
|   mean|   2.853348863176944|
| stddev|   2.687050673856492|
|    min|                   0|
|    25%|                   1|
|    50%|                   1|
|    75%|                   4|
|    max|                  24|
+-------+--------------------+



                                                                                

In [35]:
products_df_cleaned = products_df.withColumn(
'product_size_category',
when(col('product_weight_g') <500,'Small')
.when(col('product_weight_g').between(500,2000),'Medium')
.otherwise('Large'))

In [36]:
products_df_cleaned.show(10)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                Small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

In [37]:
# Total Revenue per seller
!hadoop fs -ls /user/olist/

Found 9 items
-rw-r--r--   2 neerajde28 hadoop    9033957 2025-08-12 14:30 /user/olist/olist_customers_dataset.csv
-rw-r--r--   2 neerajde28 hadoop   61273883 2025-08-12 14:30 /user/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 neerajde28 hadoop   15438671 2025-08-12 14:30 /user/olist/olist_order_items_dataset.csv
-rw-r--r--   2 neerajde28 hadoop    5777138 2025-08-12 14:30 /user/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 neerajde28 hadoop   14451670 2025-08-12 14:30 /user/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 neerajde28 hadoop   17654914 2025-08-12 14:30 /user/olist/olist_orders_dataset.csv
-rw-r--r--   2 neerajde28 hadoop    2379446 2025-08-12 14:30 /user/olist/olist_products_dataset.csv
-rw-r--r--   2 neerajde28 hadoop     174703 2025-08-12 14:30 /user/olist/olist_sellers_dataset.csv
-rw-r--r--   2 neerajde28 hadoop       2613 2025-08-12 14:30 /user/olist/product_category_name_translation.csv


In [38]:
!hadoop fs -mkdir /user/olist_proc

mkdir: `/user/olist_proc': File exists


In [39]:
orders_with_details.write.mode('overwrite').parquet('/user/olist_proc/cleaned_data.parquet')

                                                                                

In [40]:
orders_with_details.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- payment_value_imputed: double (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- custo

In [None]:
CREATE EXTERNAL TABLE cleaned_payments (
product_id STRING,
product_category_name STRING,
product_name_length STRING,
product_category_name INT,
product_category_name INT,
product_category_name INT,
product_category_name INT,
product_category_name INT,
product_category_name STRING
)

STORED AS PARQUET
LOCATION '/user/olist_proc/product_df_cleaned.parquet';

In [43]:
products_df_cleaned.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_size_category: string (nullable = false)



In [44]:
spark.stop()