In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Data Cleaning & Transformation').getOrCreate()

25/06/20 02:19:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
customer_df = spark.read.csv('/data/olist/olist_customers_dataset.csv', header=True, inferSchema=True)
geolocation_df = spark.read.csv('/data/olist/olist_geolocation_dataset.csv', header=True, inferSchema=True)
order_items_df = spark.read.csv('/data/olist/olist_order_items_dataset.csv', header=True, inferSchema=True)
order_payments_df = spark.read.csv('/data/olist/olist_order_payments_dataset.csv', header=True, inferSchema=True)
order_reviews_df = spark.read.csv('/data/olist/olist_order_reviews_dataset.csv', header=True, inferSchema=True)
orders_df = spark.read.csv('/data/olist/olist_orders_dataset.csv', header=True, inferSchema=True)
products_df = spark.read.csv('/data/olist/olist_products_dataset.csv', header=True, inferSchema=True)
sellers_df = spark.read.csv('/data/olist/olist_sellers_dataset.csv', header=True, inferSchema=True)

                                                                                

In [5]:
from pyspark.sql.functions import *
print(customer_df.columns)
print(col(c) for c in customer_df.columns)

['customer_id', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state']
<generator object <genexpr> at 0x7fd2c751f5a0>


In [16]:
def countNullColumn(df, dfName):
    print(f'missing values in {dfName} : ')
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show()

In [17]:
countNullColumn(customer_df, 'customer')

missing values in customer : 
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [19]:
countNullColumn(orders_df, 'orders')

missing values in orders : 




+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



                                                                                

### Handle missing Values
1. Drop missing values (for non critical columns)
2. Fill missing values (for numerical columns)
3. Impute missing values (for continuous data)

In [20]:
orders_df.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [19]:
#Only drop a row if any of these specific columns have null values.
orders_df_cleaned = orders_df.na.drop(subset = ['order_id', 'customer_id', 'order_status'])
orders_df_cleaned.show(10)

[Stage 27:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

                                                                                

In [20]:
orders_df_cleaned = orders_df.fillna({'order_delivered_customer_date' : '9999-12-31'})

In [36]:
orders_df_cleaned.show(10)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

### Impute missing fields

In [23]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'], outputCols=['payment_value_imputed']).setStrategy('mean')
payments_df_cleaned = imputer.fit(order_payments_df).transform(order_payments_df)

                                                                                

In [43]:
payments_df_cleaned.show()

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

In [44]:
def printingSchema(df, dfName):
    print(f'schema of {dfName} : ')
    df.printSchema()

In [45]:
printingSchema(customer_df, 'customers')

schema of customers : 
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [46]:
printingSchema(orders_df, 'orders')

schema of orders : 
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [48]:
orders_df.show(3)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [21]:
orders_df_cleaned = orders_df.withColumn('order_purchase_timestamp', to_date(col('order_purchase_timestamp')))
orders_df_cleaned.show(3)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|              2017-10-02|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|              2018-07-24|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [50]:
order_payments_df.show(2)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 2 rows



In [24]:
payments_df_cleaned = order_payments_df.withColumn('payment_type', when(col('payment_type') == 'credit_card','Credit Card')
                                                                   .when(col('payment_type') == 'boleto', 'Bank Transfer')
                                                                   .when(col('payment_type') == 'debit_card', 'Debit Card')
                                                                   .otherwise('other'))

In [25]:
payments_df_cleaned.show()

+--------------------+------------------+-------------+--------------------+-------------+
|            order_id|payment_sequential| payment_type|payment_installments|payment_value|
+--------------------+------------------+-------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1|  Credit Card|                   8|        99.33|
|a9810da82917af2d9...|                 1|  Credit Card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1|  Credit Card|                   1|        65.71|
|ba78997921bbcdc13...|                 1|  Credit Card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1|  Credit Card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1|  Credit Card|                   2|        96.12|
|771ee386b001f0620...|                 1|  Credit Card|                   1|        81.16|
|3d7239c394a212faa...|                 1|  Credit Card|                   3|        51.84|

In [55]:
payments_df_cleaned.groupBy('payment_type').count().show()

+-------------+-----+
| payment_type|count|
+-------------+-----+
|  Credit Card|76795|
|Bank Transfer|19784|
|   Debit Card| 1529|
|        other| 5778|
+-------------+-----+



In [14]:
#zip code (int -> string)
customer_df.printSchema()
customer_df_cleaned = customer_df.withColumn('customer_zip_code_prefix', col('customer_zip_code_prefix').cast('string'))
customer_df_cleaned.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [9]:
#remove duplicate based on customer_id

In [17]:
customer_df_cleaned.count()

                                                                                

99441

In [16]:
customer_df_cleaned = customer_df_cleaned.dropDuplicates(['customer_id'])

In [26]:
#combine multiple ccolumns 
orders_with_details = orders_df_cleaned.join(order_items_df, 'order_id', 'left')\
                                       .join(payments_df_cleaned, 'order_id', 'left' )\
                                       .join(customer_df_cleaned, 'customer_id', 'left')
orders_with_details.show()

[Stage 40:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+-------------+--------------------+-------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|payment_sequential| payment_type|payment_installments|payment_value|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+-----------------------

                                                                                

In [30]:
orders_with_details.groupBy('order_id').agg(sum('payment_value').alias('total_order_value')).orderBy(asc('total_order_value')).show()



+--------------------+-----------------+
|            order_id|total_order_value|
+--------------------+-----------------+
|bfbd0f9bdef843021...|             NULL|
|c8c528189310eaa44...|              0.0|
|00b1cb0320190ca0d...|              0.0|
|4637ca194b6387e2d...|              0.0|
|f1d5c2e6867fa93ce...|             9.59|
|e8bbc1d69fee39eee...|            10.07|
|37193e64eb9a46b7f...|            10.89|
|47d11383b93b217d9...|            11.56|
|38bcb524e1c38c2c1...|            11.62|
|27eebc49f55d8e9b8...|            11.63|
|8bf12a5b441bd86a1...|            11.63|
|c79bdf061e2228860...|            12.22|
|44a2fb6a4520b17de...|            12.28|
|97369eeb115806c27...|            12.39|
|767dd7bdeb5f5d8f1...|            12.89|
|7c2ee08449e6b8b55...|            12.89|
|e444d35248c6b1c8b...|            13.17|
|6bbd90ca863235a91...|            13.29|
|6f239f76247919dca...|            13.36|
|0661cdf16c5593646...|            13.38|
+--------------------+-----------------+
only showing top

                                                                                

## Advance Transformation

In [32]:
quantiles = order_items_df.approxQuantile('price', [0.01, 0.99], 0.0)
low_cutoff, high_cutoff = quantiles[0], quantiles[1]

                                                                                

In [33]:
low_cutoff, high_cutoff

(9.99, 890.0)

In [34]:
order_items_df.select('price').summary().show()



+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            112650|
|   mean|120.65373901471354|
| stddev|183.63392805026012|
|    min|              0.85|
|    25%|              39.9|
|    50%|             74.99|
|    75%|             134.9|
|    max|            6735.0|
+-------+------------------+



                                                                                

In [36]:
!hdfs dfs -ls /data/olist

Found 9 items
-rw-r--r--   2 harishankargiri16 hadoop    9033957 2025-06-18 03:47 /data/olist/olist_customers_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop   61273883 2025-06-18 03:47 /data/olist/olist_geolocation_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop   15438671 2025-06-18 03:47 /data/olist/olist_order_items_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop    5777138 2025-06-18 03:47 /data/olist/olist_order_payments_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop   14451670 2025-06-18 03:47 /data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop   17654914 2025-06-18 03:47 /data/olist/olist_orders_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop    2379446 2025-06-18 03:47 /data/olist/olist_products_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop     174703 2025-06-18 03:47 /data/olist/olist_sellers_dataset.csv
-rw-r--r--   2 harishankargiri16 hadoop       2613 2025-06-18 03:47 /data/olist/product_category_name_translation.c

In [38]:
!hdfs dfs -mkdir /data/olist_processed_data

In [39]:
orders_with_details.write.mode('overwrite').parquet('/data/olist_processed_data/cleaned_data.parquet')

                                                                                

In [40]:
!hdfs dfs -ls /data/olist_processed_data

Found 1 items
drwxr-xr-x   - root hadoop          0 2025-06-21 14:49 /data/olist_processed_data/cleaned_data.parquet


In [41]:
spark.stop()