In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('olist').getOrCreate()

25/05/12 08:50:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
hdfs_path = '/data/olist/'

In [3]:
customers_df = spark.read.csv(hdfs_path + 'olist_customers_dataset.csv',header=True,inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + 'olist_geolocation_dataset.csv',header=True,inferSchema=True)
order_items_df = spark.read.csv(hdfs_path + 'olist_order_items_dataset.csv',header=True,inferSchema=True)
payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv',header=True,inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + 'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
orders_df = spark.read.csv(hdfs_path + 'olist_orders_dataset.csv',header=True,inferSchema=True)
products_df = spark.read.csv(hdfs_path + 'olist_products_dataset.csv',header=True,inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + 'olist_sellers_dataset.csv',header=True,inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + 'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [4]:
from pyspark.sql.functions import *

In [5]:
#Identify Missing Values
def missing_values(df,df_name):
    print(f'Missing values in {df_name}:')
    df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()

In [6]:
missing_values(customers_df,'customer')

Missing values in customer:




+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



                                                                                

In [7]:
missing_values(orders_df,'order')

Missing values in order:




+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



                                                                                

In [8]:
missing_values(order_items_df,'order_item')

Missing values in order_item:
+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



                                                                                

In [9]:
missing_values(payments_df,'payments')

Missing values in payments:
+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



#handle missing values
1. Drop missing values ( for non- critical columns)
2. Fill missing Values ( for numerical value ) 
3. Impute Missing Values ( mean, median , knn) (for continous data)

In [10]:
orders_df_cleaned = orders_df.na.drop(subset=['order_id','customer_id','order_status'])

In [18]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [11]:
orders_df_cleaned = orders_df.fillna({'order_delivered_customer_date':'9999-12-31'})

In [20]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

# Impute missing  values



In [12]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'],outputCols=['payment_value_imputed']).setStrategy('mean')


payments_df_cleaned = imputer.fit(payments_df).transform(payments_df)

In [25]:
payments_df_cleaned.show()

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|               

# Standardizing the format

In [13]:
def print_schema(df,df_name):
    print(f'schema of {df_name}:')
    df.printSchema()

In [14]:
print_schema(customers_df,'customers')

schema of customers:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [15]:
orders_df_cleaned = orders_df_cleaned\
.withColumn('order_purchase_timestamp',to_date(col('order_purchase_timestamp')))

In [32]:
orders_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|              2017-10-02|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|              2018-07-24|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [16]:
payments_df_cleaned = payments_df_cleaned.withColumn(
    'payment_type',
    when(col('payment_type') == 'boleto', 'Offline Payment')
    .when(col('payment_type') == 'credit_card', 'Credit Card')
    .when(col('payment_type') == 'debit_card', 'Debit Card')
    .otherwise('Other')
)

In [40]:
payments_df_cleaned.show()

+--------------------+------------------+---------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|   payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+---------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1|    Credit Card|                   8|        99.33|                99.33|
|a9810da82917af2d9...|                 1|    Credit Card|                   1|        24.39|                24.39|
|25e8ea4e93396b6fa...|                 1|    Credit Card|                   1|        65.71|                65.71|
|ba78997921bbcdc13...|                 1|    Credit Card|                   8|       107.78|               107.78|
|42fdf880ba16b47b5...|                 1|    Credit Card|                   2|       128.45|               128.45|
|298fcdf1f73eb413e...|                 1|    Credit Card|                   2|  

In [17]:
customers_df_cleaned = customers_df\
.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix')\
            .cast('string'))

In [18]:
print_schema(customers_df_cleaned,'customers')

schema of customers:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



# Remove Duplicate Records

In [19]:
customers_df_cleaned = customers_df_cleaned.dropDuplicates(['customer_id'])

# Data TransFormation

In [21]:
order_with_details = orders_df_cleaned.join(order_items_df,'order_id','left')\
.join(payments_df_cleaned,'order_id','left')\
.join(customers_df_cleaned,'customer_id','left')

In [22]:
order_with_details.show()

                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+---------------+--------------------+-------------+---------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|payment_sequential|   payment_type|payment_installments|payment_value|payment_value_imputed|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+--------------------

In [23]:
order_with_total_value = order_with_details.groupBy('order_id')\
.agg(sum('payment_value').alias('total_order_value'))

In [24]:
order_with_total_value.show(5)



+--------------------+-----------------+
|            order_id|total_order_value|
+--------------------+-----------------+
|118045506e1c1dda0...|           1802.0|
|f44cb69655f8e4d13...|           164.32|
|edcc6b79e8394346b...|           162.63|
|9f98d6530155e3b38...|           316.76|
|949280c70c6d62ec9...|            49.42|
+--------------------+-----------------+
only showing top 5 rows



                                                                                

In [50]:
order_with_details.printSchema()


root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- payment_value_imputed: double (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- custo

In [55]:
# Delivery time calculation
order_with_delivery_time = order_with_details \
    .filter(col('order_delivered_customer_date') != '9999-12-31 00:00:00') \
    .withColumn('delivery_time', datediff(
        to_date(col('order_delivered_customer_date')),
        to_date(col('order_purchase_timestamp'))
    )) \
    .select('order_id','delivery_time') \
    .orderBy('delivery_time', ascending=False)


In [56]:
order_with_delivery_time.show(5)

+--------------------+-------------+
|            order_id|delivery_time|
+--------------------+-------------+
|ca07593549f1816d2...|          210|
|1b3190b2dfa9d789e...|          208|
|440d0d17af552815d...|          196|
|2fb597c2f772eca01...|          195|
|285ab9426d6982034...|          195|
+--------------------+-------------+
only showing top 5 rows



#Advance Transformation

In [57]:
quantiles = order_items_df.approxQuantile('price',[0.01,0.99],0.0)
low_cutoff,high_cutoff = quantiles[0],quantiles[1]

In [58]:
order_items_df.select('price').summary().show()



+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            112650|
|   mean|120.65373901471354|
| stddev|183.63392805026012|
|    min|              0.85|
|    25%|              39.9|
|    50%|             74.99|
|    75%|             134.9|
|    max|            6735.0|
+-------+------------------+



                                                                                

In [59]:
low_cutoff,high_cutoff

(9.99, 890.0)

In [61]:
order_items_df_cleaned = order_items_df.filter((col('price')>=low_cutoff) & (col('price')<=high_cutoff))

In [63]:
products_df.select('product_weight_g').summary().show()

+-------+------------------+
|summary|  product_weight_g|
+-------+------------------+
|  count|             32949|
|   mean|2276.4724877841513|
| stddev| 4282.038730977024|
|    min|                 0|
|    25%|               300|
|    50%|               700|
|    75%|              1900|
|    max|             40425|
+-------+------------------+



In [65]:
products_df_cleaned = products_df.withColumn(
    'product_size_category',
    when(col('product_weight_g')<500,'Small')
    .when(col('product_weight_g').between(500,2000),'Medium')
    .otherwise('Large')
)

In [66]:
products_df_cleaned.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                Small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

# Total Revenue per Seller

In [68]:
print_schema(sellers_df,'sellers')

schema of sellers:
root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [71]:
seller_revenue_df = order_with_details.withColumn('item_revenue',col('price')+col('freight_value')).groupBy('seller_id')\
                                                  .agg(sum('item_revenue').alias('total_revenue'))

In [72]:
seller_revenue_with_info = seller_revenue_df.join(sellers_df,on='seller_id',how='left').orderBy(col('total_revenue').desc())

In [73]:
seller_revenue_with_info.show(5)



+--------------------+------------------+----------------------+----------------+------------+
|           seller_id|     total_revenue|seller_zip_code_prefix|     seller_city|seller_state|
+--------------------+------------------+----------------------+----------------+------------+
|53243585a1d6dc264...|258882.28000000003|                 42738|lauro de freitas|          BA|
|4869f7a5dfa277a7d...|258625.52000000005|                 14840|         guariba|          SP|
|7c67e1448b00f6e96...|252549.31999999957|                  8577| itaquaquecetuba|          SP|
|4a3ca9315b744ce9f...|250635.29000000036|                 14940|        ibitinga|          SP|
|fa1c13f2614d7b5c4...| 214454.8200000002|                 13170|          sumare|          SP|
+--------------------+------------------+----------------------+----------------+------------+
only showing top 5 rows



                                                                                

In [74]:
!hadoop fs -mkdir /data/olist_proc

In [75]:
order_with_details.write.mode('overwrite').parquet('/data/olist_proc/cleaned_data.parquet')

                                                                                

In [77]:
products_df_cleaned.write.mode('overwrite').parquet('/data/olist_proc/product_df_cleaned.parquet')

In [78]:
!hadoop fs -ls /data/olist_proc

Found 2 items
drwxr-xr-x   - root hadoop          0 2025-05-12 13:57 /data/olist_proc/cleaned_data.parquet
drwxr-xr-x   - root hadoop          0 2025-05-12 14:00 /data/olist_proc/product_df_cleaned.parquet


In [79]:
products_df_cleaned.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_size_category: string (nullable = false)



In [81]:
CREATE EXTERNAL TABLE cleaned_orders (
    product_id STRING,
    product_category_name INT ,
    product_name_lenght INT,
    product_description_lenght INT,
    product_photos_qty INT,
    product_weight_g INT,
    product_length_cm INT,
    product_height_cm INT,
    product_width_cm INT,
    product_size_category STRING
)
STORED AS PARQUET
LOCATION '/data/olist_proc/product_df_cleaned.parquet';

SyntaxError: invalid syntax (2302627601.py, line 1)

In [82]:
spark.stop()