# Module 2 - Data Cleaning and Transformation

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('OlistData') \
.getOrCreate()

25/09/16 15:10:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [4]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True,inferSchema=True)
order_item_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True,inferSchema=True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True,inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True,inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True,inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True,inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [98]:
order_item_df.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

In [5]:
customers_df.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [6]:
# Identify Missing Values

In [7]:
from pyspark.sql.functions import *

In [8]:
def missing_values(df,df_name):
    print(f'Missing values in {df_name}:')
    df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()


In [9]:
missing_values(customers_df,'customer')

Missing values in customer:
+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [10]:
missing_values(orders_df,'orders')

Missing values in orders:
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [11]:
missing_values(order_item_df,'order_item')

Missing values in order_item:
+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



In [12]:
missing_values(payments_df,'payments')

Missing values in payments:
+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



# Handle Missing Values

1. Drop missing Values ( for non - critical columns )

2. Fill missing values ( for numerical columns )

3. Impute Missing Values ( for continous data )

In [13]:
orders_df_cleaned = orders_df.na.drop(subset=['order_id','customer_id','order_status'])

In [14]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [17]:
orders_df_cleaned = orders_df.fillna({'order_delivered_customer_date':'9999-12-31'})

In [18]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

# Impute missing values 

In [19]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'],outputCols=['payment_value_imputed']).setStrategy('mean')


payments_df_cleaned = imputer.fit(payments_df).transform(payments_df)

In [20]:
payments_df_cleaned.show()

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

In [21]:
payments_df_with_null = payments_df.withColumn('payment_value',when(col('payment_value')!= 99.33,col('payment_value')).otherwise(lit(None)))

In [22]:
payments_df_with_null.show()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|
|771ee386b001f0620...|                 1| credit_card|                   1|        81.16|
|3d7239c394a212faa...|                 1| credit_card|                   3|        51.84|
|1f78449c8

In [23]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'],outputCols=['payment_value_imputed']).setStrategy('median')


payments_df_cleaned = imputer.fit(payments_df_with_null).transform(payments_df_with_null)

payments_df_cleaned.show()

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|                100.0|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

# Standardizing the format

In [24]:
def print_schema(df,df_name):
    print(f'schema of {df_name}:')
    df.printSchema()

In [25]:
print_schema(orders_df,'orders')

schema of orders:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [26]:
print_schema(customers_df,'customers')

schema of customers:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [27]:
print_schema(payments_df,'payments')

schema of payments:
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [28]:
orders_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [29]:
orders_df_cleaned = orders_df_cleaned.withColumn('order_purchase_timestamp',to_date(col('order_purchase_timestamp')))

In [30]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|              2017-10-02|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|              2018-07-24|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [31]:
payments_df_cleaned.show()

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|                100.0|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

In [32]:
payments_df_cleaned = payments_df_cleaned.withColumn('payment_type',when(col('payment_type')=='boleto','Bank Transfer')
                                                     .when(col('payment_type')=='credit_card','Credit Card')
                                                     .when(col('payment_type')=='debit_card','Debit Card')
                                                    .otherwise('other'))

In [33]:
payments_df_cleaned.show()

+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential| payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+-------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1|  Credit Card|                   8|         NULL|                100.0|
|a9810da82917af2d9...|                 1|  Credit Card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1|  Credit Card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1|  Credit Card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1|  Credit Card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1|  Credit Card|                   2|        96.12|      

In [34]:
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [35]:
customers_df.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [36]:
customers_df_cleaned = customers_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast('string'))

In [37]:
customers_df_cleaned.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



# Remove Duplicate Records

In [52]:
# customers_df_cleaned = customers_df_cleaned.dropDuplicates(['customer_id'])
# orders_df_cleaned = orders_df.dropDuplicates(['order_id'])
# order_item_df_cleaned = order_item_df.dropDuplicates(['product_id'])
# payments_df_cleaned = payments_df.dropDuplicates(['order_id'])
# reviews_df_cleaned = reviews_df.dropDuplicates(['order_id'])
# products_df_cleaned = products_df.dropDuplicates(['product_id'])
# sellers_df_cleaned = sellers_df.dropDuplicates(['seller_id'])
# geolocation_df_cleaned = geolocation_df.dropDuplicates(['geolocation_zip_code_prefix'])
# category_translation_df_cleaned = category_translation_df.dropDuplicates(['product_category_name'])


In [53]:
def drop_dupes(df, col):
    return df.dropDuplicates([col])

datasets = {
    "customers": (customers_df, "customer_id"),
    "orders": (orders_df, "order_id"),
    "order_items": (order_item_df, "product_id"),
    "payments": (payments_df, "order_id"),
    "reviews": (reviews_df, "order_id"),
    "products": (products_df, "product_id"),
    "sellers": (sellers_df, "seller_id"),
    "geolocation": (geolocation_df, "geolocation_zip_code_prefix"),
    "category_translation": (category_translation_df, "product_category_name")
}

cleaned_dfs = {name: drop_dupes(df, col) for name, (df, col) in datasets.items()}


In [54]:
geolocation_df.show(5)

+---------------------------+-------------------+------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+------------------+----------------+-----------------+
|                       1037| -23.54562128115268|-46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535|-46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469|-46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681|-46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493|-46.64160722329613|       sao paulo|               SP|
+---------------------------+-------------------+------------------+----------------+-----------------+
only showing top 5 rows



# Data Transformation

In [55]:
order_with_details = orders_df_cleaned.join(order_item_df,'order_id','left')\
.join(payments_df_cleaned,'order_id','left')\
.join(customers_df_cleaned,'customer_id','left')

In [63]:
order_with_details.show(5)

                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+------------+--------------------+-------------+--------------------+------------------------+-------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|payment_sequential|payment_type|payment_installments|payment_value|  customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+----------

In [58]:
order_with_total_value = order_with_details.groupBy('order_id')\
.agg(sum('payment_value').alias('total_order_value'))

In [59]:
order_with_total_value.show(5)

                                                                                

+--------------------+-----------------+
|            order_id|total_order_value|
+--------------------+-----------------+
|118045506e1c1dda0...|           1802.0|
|f44cb69655f8e4d13...|           146.32|
|edcc6b79e8394346b...|           162.63|
|9f98d6530155e3b38...|           316.76|
|949280c70c6d62ec9...|            49.42|
+--------------------+-----------------+
only showing top 5 rows



## Delivery Time Calculation 

In [64]:
delivery_time = order_with_details.select('order_id','order_purchase_timestamp','order_approved_at','order_delivered_carrier_date',
                                          'order_delivered_customer_date','order_estimated_delivery_date')

In [66]:
delivery_time.show(5)

+--------------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|118045506e1c1dda0...|     2018-03-08 19:06:05|2018-03-09 19:08:26|         2018-03-13 21:24:28|          2018-04-11 12:53:50|          2018-04-04 00:00:00|
|118045506e1c1dda0...|     2018-03-08 19:06:05|2018-03-09 19:08:26|         2018-03-13 21:24:28|          2018-04-11 12:53:50|          2018-04-04 00:00:00|
|118045506e1c1dda0...|     2018-03-08 19:06:05|2018-03-09 19:08:26|         2018-03-13 21:24:28|          2018-04-11 12:53:50|          2018-04-04 00:00:00|
|118045506e1c1dda0...|     2018-03-08 19:06:05|2018-03-09 

In [78]:
total_delivery_time = delivery_time.withColumn('total_delivery_time',datediff(col('order_delivered_customer_date'),col('order_purchase_timestamp')))


In [79]:
total_delivery_time.orderBy('total_delivery_time',ascending =False).show()

+--------------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|            order_id|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|total_delivery_time|
+--------------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|ca07593549f1816d2...|     2017-02-21 23:31:27|2017-02-23 02:35:15|         2017-03-08 13:47:46|          2017-09-19 14:36:39|          2017-03-22 00:00:00|                210|
|1b3190b2dfa9d789e...|     2018-02-23 14:57:35|2018-02-23 15:16:14|         2018-02-26 18:49:07|          2018-09-19 23:24:07|          2018-03-15 00:00:00|                208|
|440d0d17af552815d...|     2017-03-07 23:59:51|2017-03-09 01:11:33|         2017-03-15 13:00:08|          2017-09-1

# Advance Transformation 

In [80]:
order_item_df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [81]:
quantiles = order_item_df.approxQuantile('price',[0.01,0.99],0.0)
low_cutoff,high_cutoff = quantiles[0],quantiles[1]

In [82]:
# order_item_df.select('price').summary().show()

In [83]:
low_cutoff,high_cutoff

(9.99, 890.0)

In [84]:
order_item_df_cleaned = order_item_df.filter((col('price') >=low_cutoff) & (col('price') <=high_cutoff))

In [85]:
payments_df_cleaned.select('payment_installments').summary().show()

[Stage 156:>                                                        (0 + 2) / 2]

+-------+--------------------+
|summary|payment_installments|
+-------+--------------------+
|  count|               99440|
|   mean|  2.9156174577634753|
| stddev|   2.709860813743852|
|    min|                   0|
|    25%|                   1|
|    50%|                   2|
|    75%|                   4|
|    max|                  24|
+-------+--------------------+



                                                                                

In [86]:
products_df.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [87]:
products_df_cleaned = products_df.withColumn(
    'product_size_category',
    when(col('product_weight_g') <500,'Small')
    .when(col('product_weight_g').between(500,2000),'Medium')
    .otherwise('Large')
)

In [88]:
products_df_cleaned.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                Small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

# Total Revenue per Seller

In [114]:
total_revenue_sellers.show(5)

+--------------------+-----------------+
|           seller_id|       sum(price)|
+--------------------+-----------------+
|8e6cc767478edae94...|6830.580000000001|
|4d600e08ecbe08258...|          4465.34|
|9b1050e85becf3ae9...|            85.14|
|cb5ff1b9715e99589...|             85.0|
|038b75b729c8a9a04...|            467.0|
+--------------------+-----------------+
only showing top 5 rows



In [115]:
 total_revenue_sellers = order_item_df.groupby('seller_id').agg(sum('price').alias('price'))
total_revenue_sellers.orderBy('price',ascending=False).show()

+--------------------+------------------+
|           seller_id|             price|
+--------------------+------------------+
|4869f7a5dfa277a7d...|229472.62999999913|
|53243585a1d6dc264...| 222776.0499999998|
|4a3ca9315b744ce9f...| 200472.9199999981|
|fa1c13f2614d7b5c4...|194042.02999999968|
|7c67e1448b00f6e96...|187923.89000000118|
|7e93a43ef30c4f03f...| 176431.8700000001|
|da8622b14eb17ae28...| 160236.5699999994|
|7a67c85e85bb2ce85...|141745.53000000078|
|1025f0e2d44d7041d...|138968.55000000005|
|955fee9216a65b617...|135171.70000000007|
|46dc3b2cc0980fb8e...|128111.19000000009|
|6560211a19b47992c...|123304.83000000002|
|620c87c171fb2a6dd...|114774.50000000048|
|7d13fca1522535862...|113628.97000000006|
|5dceca129747e92ff...|112155.53000000014|
|1f50f920176fa81da...|106939.21000000078|
|cc419e0650a3c5ba7...|104288.41999999946|
|a1043bafd471dff53...|101901.15999999992|
|3d871de0142ce09b7...| 94914.20000000022|
|edb1ef5e36e0c8cd8...| 79284.55000000013|
+--------------------+------------

In [117]:
!hadoop fs -mkdir /data/olist_proc

mkdir: `/data/olist_proc': File exists


In [118]:
order_with_details.write.mode('overwrite').parquet('/data/olist_proc/cleaned_data.parquet')

                                                                                

In [119]:
products_df_cleaned.write.mode('overwrite').parquet('/data/olist_proc/product_df_cleaned.parquet')

In [120]:
!hadoop fs -ls /data/olist_proc

Found 2 items
drwxr-xr-x   - root hadoop          0 2025-09-16 18:21 /data/olist_proc/cleaned_data.parquet
drwxr-xr-x   - root hadoop          0 2025-09-16 18:21 /data/olist_proc/product_df_cleaned.parquet


In [93]:
order_with_details.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |

In [94]:
products_df_cleaned.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_size_category: string (nullable = false)



In [None]:
CREATE EXTERNAL TABLE cleaned_payments(
    product_id STRING,
    product_category_name STRING,
    product_name_lenght STRING,
    product_description_lenght INT,
    product_photos_qty INT,
    product_weight_g INT,
    product_length_cm INT,
    product_height_cm INT,
    product_width_cm INT,
    product_size_category STRING
)
STORED AS PARQUET
LOCATION '/data/olist_proc/product_df_cleaned.parquet';