In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
.appName('OlistEDA') \
.getOrCreate()

In [3]:
spark

In [4]:
!hdfs dfs -ls /user/winardi/data/olist/

Found 10 items
-rw-r--r--   1 User supergroup    9033957 2025-09-19 09:22 /user/winardi/data/olist/olist_customers_dataset.csv
-rw-r--r--   1 User supergroup   61273883 2025-09-19 09:22 /user/winardi/data/olist/olist_geolocation_dataset.csv
-rw-r--r--   1 User supergroup   15438671 2025-09-19 09:22 /user/winardi/data/olist/olist_order_items_dataset.csv
-rw-r--r--   1 User supergroup    5777138 2025-09-19 09:22 /user/winardi/data/olist/olist_order_payments_dataset.csv
-rw-r--r--   1 User supergroup   14451670 2025-09-19 09:22 /user/winardi/data/olist/olist_order_reviews_dataset.csv
-rw-r--r--   1 User supergroup   17654914 2025-09-19 09:22 /user/winardi/data/olist/olist_orders_dataset.csv
-rw-r--r--   1 User supergroup    2379446 2025-09-19 09:22 /user/winardi/data/olist/olist_products_dataset.csv
-rw-r--r--   1 User supergroup     174703 2025-09-19 09:22 /user/winardi/data/olist/olist_sellers_dataset.csv
drwxr-xr-x   - User supergroup          0 2025-09-30 13:27 /user/winardi/data/olis

In [5]:
hdfs_path ="hdfs://localhost:9000/user/winardi/data/olist/"

In [6]:
customers_df = spark.read.csv(hdfs_path + "olist_customers_dataset.csv", header=True, inferSchema=True)
orders_df = spark.read.csv(hdfs_path + "olist_orders_dataset.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv(hdfs_path + "olist_order_items_dataset.csv", header=True, inferSchema=True)
payments_df = spark.read.csv(hdfs_path + "olist_order_payments_dataset.csv", header=True, inferSchema=True)
reviews_df = spark.read.csv(hdfs_path + "olist_order_reviews_dataset.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)
products_df = spark.read.csv(hdfs_path + "olist_products_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv(hdfs_path + "olist_sellers_dataset.csv", header=True, inferSchema=True)
geolocation_df = spark.read.csv(hdfs_path + "olist_geolocation_dataset.csv", header=True, inferSchema=True)
category_translation_df = spark.read.csv(hdfs_path + "product_category_name_translation.csv", header=True, inferSchema=True)

# Data Ingestion and Exploration

## Cek Missing Values & Duplicate untuk Semua Tabel

In [7]:
from pyspark.sql.functions import col, count, when, lit

def null_count_report(df):
    reports = []
    for c in df.columns:
        reports.append(
            df.select(count(when(col(c).isNull(), 1)).alias("null_count"))
              .withColumn("column_name", lit(c))
        )
    result = reports[0]
    for r in reports[1:]:
        result = result.union(r)
    return result.select("column_name", "null_count")

#### Customer_df

In [8]:
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [9]:
print(f'Customers : {customers_df.count()} rows')

Customers : 99441 rows


In [10]:
# check nulls value
null_count_report(customers_df).show(truncate=False)

+------------------------+----------+
|column_name             |null_count|
+------------------------+----------+
|customer_id             |0         |
|customer_unique_id      |0         |
|customer_zip_code_prefix|0         |
|customer_city           |0         |
|customer_state          |0         |
+------------------------+----------+



In [11]:
# Duplicate Values
customers_df.groupBy('customer_id').count().filter('count>1').show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



#### orders_df

In [12]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [13]:
print(f'Orders : {orders_df.count()} rows')

Orders : 99441 rows


In [14]:
null_count_report(orders_df).show(truncate=False)

+-----------------------------+----------+
|column_name                  |null_count|
+-----------------------------+----------+
|order_id                     |0         |
|customer_id                  |0         |
|order_status                 |0         |
|order_purchase_timestamp     |0         |
|order_approved_at            |160       |
|order_delivered_carrier_date |1783      |
|order_delivered_customer_date|2965      |
|order_estimated_delivery_date|0         |
+-----------------------------+----------+



In [15]:
# Cek order delivered tapi timestamp delivery kosong (anomali)
delivered_nulls = orders_df.filter(
    (col("order_status") == "delivered") &
    (col("order_delivered_customer_date").isNull())
)
print("Delivered tapi tanggal customer null:", delivered_nulls.count())

Delivered tapi tanggal customer null: 8


In [16]:
from pyspark.sql.functions import col

# Flag apakah null wajar atau tidak
orders_checked = orders_df.withColumn(
    "timestamp_null_status",
    # kalau approved_at null → wajar kalau carrier_date & customer_date juga null
    when(col("order_approved_at").isNull() & 
         (col("order_delivered_carrier_date").isNull()) &
         (col("order_delivered_customer_date").isNull()), "valid_null_purchase_only")
    # kalau carrier_date null → wajar kalau customer_date juga null
    .when(col("order_delivered_carrier_date").isNull() & 
          (col("order_delivered_customer_date").isNull()), "valid_null_until_approved")
    # kalau customer_date null tapi carrier_date ada → artinya sedang dalam proses pengiriman
    .when(col("order_delivered_customer_date").isNull() & 
          col("order_delivered_carrier_date").isNotNull(), "valid_null_until_carrier")
    # kalau delivered status tapi masih null → anomali
    .when((col("order_status") == "delivered") & col("order_delivered_customer_date").isNull(), "invalid_null_expected_delivery")
    # selain itu → complete
    .otherwise("complete")
)

orders_checked.groupBy("timestamp_null_status").count().show(truncate=False)


+-------------------------+-----+
|timestamp_null_status    |count|
+-------------------------+-----+
|valid_null_until_carrier |1183 |
|valid_null_until_approved|1636 |
|valid_null_purchase_only |146  |
|complete                 |96476|
+-------------------------+-----+



In [17]:
orders_with_invalid = orders_df.withColumn(
    "invalid_null_status",
    when(
        (col("order_approved_at").isNull() & 
         (col("order_delivered_carrier_date").isNotNull() | col("order_delivered_customer_date").isNotNull())),
        "invalid_null_in_approved"
    ).when(
        (col("order_delivered_carrier_date").isNull() & 
         col("order_delivered_customer_date").isNotNull()),
        "invalid_null_in_carrier"
    ).otherwise("valid")
)

orders_with_invalid.groupBy("invalid_null_status").count().show(truncate=False)

+------------------------+-----+
|invalid_null_status     |count|
+------------------------+-----+
|valid                   |99426|
|invalid_null_in_approved|14   |
|invalid_null_in_carrier |1    |
+------------------------+-----+



In [18]:
# Duplicate Values
orders_df.groupBy('order_id').count().filter('count>1').show()

+--------+-----+
|order_id|count|
+--------+-----+
+--------+-----+



#### order_items_df

In [19]:
order_items_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [20]:
print(f'Order Items : {order_items_df.count()} rows')

Order Items : 112650 rows


In [21]:
null_count_report(order_items_df).show(truncate=False)

+-------------------+----------+
|column_name        |null_count|
+-------------------+----------+
|order_id           |0         |
|order_item_id      |0         |
|product_id         |0         |
|seller_id          |0         |
|shipping_limit_date|0         |
|price              |0         |
|freight_value      |0         |
+-------------------+----------+



In [22]:
order_items_df.groupBy('order_id', 'order_item_id').count().filter('count>1').show()

+--------+-------------+-----+
|order_id|order_item_id|count|
+--------+-------------+-----+
+--------+-------------+-----+



#### payments_df

In [23]:
payments_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [24]:
print(f'Payments : {payments_df.count()} rows')

Payments : 103886 rows


In [25]:
null_count_report(payments_df).show(truncate=False)

+--------------------+----------+
|column_name         |null_count|
+--------------------+----------+
|order_id            |0         |
|payment_sequential  |0         |
|payment_type        |0         |
|payment_installments|0         |
|payment_value       |0         |
+--------------------+----------+



In [26]:
payments_df.groupBy('order_id','payment_sequential').count().filter('count>1').show(truncate=False)

+--------+------------------+-----+
|order_id|payment_sequential|count|
+--------+------------------+-----+
+--------+------------------+-----+



#### reviews_df

In [27]:
reviews_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



In [28]:
print(f'Reviews : {reviews_df.count()} rows')

Reviews : 99224 rows


In [29]:
null_count_report(reviews_df).show(truncate=False)

+-----------------------+----------+
|column_name            |null_count|
+-----------------------+----------+
|review_id              |0         |
|order_id               |0         |
|review_score           |0         |
|review_comment_title   |87656     |
|review_comment_message |58247     |
|review_creation_date   |0         |
|review_answer_timestamp|0         |
+-----------------------+----------+



In [30]:
filtered_reviews = reviews_df.groupBy('review_id','order_id').count().filter('count>1')
filtered_reviews.show()

+---------+--------+-----+
|review_id|order_id|count|
+---------+--------+-----+
+---------+--------+-----+



#### products_df

In [31]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [32]:
print(f'Products : {products_df.count()} rows')

Products : 32951 rows


In [33]:
null_count_report(products_df).show(truncate=False)

+--------------------------+----------+
|column_name               |null_count|
+--------------------------+----------+
|product_id                |0         |
|product_category_name     |610       |
|product_name_lenght       |610       |
|product_description_lenght|610       |
|product_photos_qty        |610       |
|product_weight_g          |2         |
|product_length_cm         |2         |
|product_height_cm         |2         |
|product_width_cm          |2         |
+--------------------------+----------+



In [34]:
products_df.groupBy('product_id').count().filter('count>1').show()

+----------+-----+
|product_id|count|
+----------+-----+
+----------+-----+



In [35]:
missing_products = products_df.filter(
    col("product_weight_g").isNull() | 
    col("product_length_cm").isNull() |
    col("product_height_cm").isNull() |
    col("product_width_cm").isNull()
).select("product_id")

related_orders = order_items_df.join(missing_products, on="product_id", how="inner")
print("Jumlah orders terkait produk bermasalah:", related_orders.count())

Jumlah orders terkait produk bermasalah: 18


#### sellers_df

In [36]:
sellers_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [37]:
print(f'Sellers : {sellers_df.count()} rows')

Sellers : 3095 rows


In [38]:
null_count_report(sellers_df).show(truncate=False)

+----------------------+----------+
|column_name           |null_count|
+----------------------+----------+
|seller_id             |0         |
|seller_zip_code_prefix|0         |
|seller_city           |0         |
|seller_state          |0         |
+----------------------+----------+



In [39]:
sellers_df.groupBy('seller_id').count().filter('count>1').show()

+---------+-----+
|seller_id|count|
+---------+-----+
+---------+-----+



#### geolocation_df

In [40]:
geolocation_df.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [41]:
print(f'Geolocation : {geolocation_df.count()} rows')

Geolocation : 1000163 rows


In [42]:
null_count_report(geolocation_df).show(truncate=False)

+---------------------------+----------+
|column_name                |null_count|
+---------------------------+----------+
|geolocation_zip_code_prefix|0         |
|geolocation_lat            |0         |
|geolocation_lng            |0         |
|geolocation_city           |0         |
|geolocation_state          |0         |
+---------------------------+----------+



In [43]:
# Cari kombinasi yang duplikat
duplicate_count = geolocation_df.groupBy(
    'geolocation_zip_code_prefix',
    'geolocation_lat',
    'geolocation_lng'
).count().filter(col('count') > 1)

# Tampilkan kombinasi duplikat
duplicate_count.show()

# Hitung total baris duplikat berlebih
dup_rows = duplicate_count.agg(
    sum(col('count') - 1).alias('duplicate_rows')
).collect()[0]['duplicate_rows']

print(f"Total baris duplikat: {dup_rows}")

+---------------------------+-------------------+-------------------+-----+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|count|
+---------------------------+-------------------+-------------------+-----+
|                       1230|-23.538859910352585| -46.65935566916726|    2|
|                       1242| -23.54603669248079|-46.659234221622704|    2|
|                       1230|-23.539663173466437| -46.65818240022502|    2|
|                       1229|-23.540367384124878| -46.65900446502971|    9|
|                       1227|-23.554376810721855| -46.66288171619834|    4|
|                       1221| -23.54404365889549| -46.64893993450283|    2|
|                       1232|-23.539924791563266|-46.661443218708506|    3|
|                       1321|-23.564019237578638| -46.64235930489347|    3|
|                       1308|-23.556416407149936|-46.652897587569946|    5|
|                       1303| -23.54893849570083| -46.65043121524605|    4|
|           

#### category_translation_df

In [44]:
category_translation_df.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [45]:
print(f'Category_Translation : {category_translation_df.count()} rows')

Category_Translation : 71 rows


In [46]:
null_count_report(category_translation_df).show(truncate=False)

+-----------------------------+----------+
|column_name                  |null_count|
+-----------------------------+----------+
|product_category_name        |0         |
|product_category_name_english|0         |
+-----------------------------+----------+



In [47]:
category_translation_df.groupBy('product_category_name').count().filter('count>1').show()

+---------------------+-----+
|product_category_name|count|
+---------------------+-----+
+---------------------+-----+



## Validasi Relasi Antar Tabel

In [48]:
# Orders must have a valid customer_id
invalid_orders = orders_df.join(customers_df, "customer_id", "left_anti")
print(f"Orders without customer: {invalid_orders.count()}")

Orders without customer: 0


In [49]:
# Payments harus punya order_id valid
invalid_payments = payments_df.join(orders_df, "order_id", "left_anti")
print(f"Payments without order: {invalid_payments.count()}")

Payments without order: 0


In [50]:
# Payments must have a valid order_id
invalid_reviews = reviews_df.join(orders_df, "order_id", "left_anti")
print(f"Reviews without order: {invalid_reviews.count()}")

Reviews without order: 0


In [51]:
# Order_items must have valid order_id, product_id, and seller_id
invalid_items_orders = order_items_df.join(orders_df, "order_id", "left_anti")
print(f"Order items without order: {invalid_items_orders.count()}")

invalid_items_products = order_items_df.join(products_df, "product_id", "left_anti")
print(f"Order items without product: {invalid_items_products.count()}")

invalid_items_selers = order_items_df.join(sellers_df, "seller_id", "left_anti")
print(f"Order items without seller: {invalid_items_products.count()}")

Order items without order: 0
Order items without product: 0
Order items without seller: 0


## Exploration

In [52]:
# Delivery time
from pyspark.sql.functions import datediff

delivery_df = orders_df.select("order_id", "order_purchase_timestamp", "order_delivered_customer_date")
delivery_df = delivery_df.withColumn("delivery_days", datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp")))

delivery_df.select(
    avg("delivery_days").alias("avg_days"),
    min("delivery_days").alias("min_days"),
    max("delivery_days").alias("max_days")
).show()

# Order dengan delivery_time sangat tinggi
delivery_df.filter(col("delivery_days") > 100).show(5)

+------------------+--------+--------+
|          avg_days|min_days|max_days|
+------------------+--------+--------+
|12.497336125046644|       0|     210|
+------------------+--------+--------+

+--------------------+------------------------+-----------------------------+-------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_days|
+--------------------+------------------------+-----------------------------+-------------+
|a4efaffc506a395c9...|     2018-01-17 10:50:09|          2018-05-07 19:25:33|          110|
|8b7fd198ad184563c...|     2017-11-14 10:04:27|          2018-02-27 18:05:08|          105|
|4f39a94d6e474819d...|     2017-04-28 16:28:03|          2017-09-19 13:54:18|          144|
|b31c7dea63bb08f8c...|     2017-09-26 18:35:35|          2018-02-05 21:25:43|          132|
|3602a80b09d914236...|     2018-01-31 20:38:38|          2018-05-18 11:06:52|          107|
+--------------------+------------------------+---------------------

In [53]:
# Distribusi kategori produk
products_df.groupBy("product_category_name").count().orderBy("count", ascending=False).show(10)

+---------------------+-----+
|product_category_name|count|
+---------------------+-----+
|      cama_mesa_banho| 3029|
|        esporte_lazer| 2867|
|     moveis_decoracao| 2657|
|         beleza_saude| 2444|
| utilidades_domest...| 2335|
|           automotivo| 1900|
| informatica_acess...| 1639|
|           brinquedos| 1411|
|   relogios_presentes| 1329|
|            telefonia| 1134|
+---------------------+-----+
only showing top 10 rows



In [54]:
# Distribusi seller per state
sellers_df.groupBy("seller_state").count().orderBy("count", ascending=False).show(10)

+------------+-----+
|seller_state|count|
+------------+-----+
|          SP| 1849|
|          PR|  349|
|          MG|  244|
|          SC|  190|
|          RJ|  171|
|          RS|  129|
|          GO|   40|
|          DF|   30|
|          ES|   23|
|          BA|   19|
+------------+-----+
only showing top 10 rows



In [55]:
# Distribusi jumlah item per order
order_items_df.groupBy("order_id").count().orderBy("count", ascending=False).show(10)

+--------------------+-----+
|            order_id|count|
+--------------------+-----+
|8272b63d03f5f79c5...|   21|
|1b15974a0141d54e3...|   20|
|ab14fdcfbe524636d...|   20|
|428a2f660dc84138d...|   15|
|9ef13efd6949e4573...|   15|
|73c8ab38f07dc9438...|   14|
|9bdc4d4c71aa1de46...|   14|
|37ee401157a3a0b28...|   13|
|3a213fcdfe7d98be7...|   12|
|af822dacd6f5cff73...|   12|
+--------------------+-----+
only showing top 10 rows



In [56]:
# Customer Distribution by state
customers_df.groupBy('customer_state').count().orderBy('count',ascending=False).show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



In [57]:
# Order - Order status distribution
orders_df.groupBy('order_status').count().orderBy('count',ascending=False).show()

+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



In [58]:
#payment_type distribution
payments_df.groupBy('payment_type').count().orderBy('count',ascending=False).show()

+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



In [59]:
#Top selling Products
top_products = order_items_df.groupBy('product_id').agg(sum('price').alias('total_sales'))
top_products.orderBy('total_sales',ascending=False).show(20)

+--------------------+------------------+
|          product_id|       total_sales|
+--------------------+------------------+
|bb50f2e236e5eea01...|           63885.0|
|6cdd53843498f9289...| 54730.19999999998|
|d6160fb7873f18409...|          48899.34|
|d1c427060a0f73f6b...| 47214.50999999998|
|99a4788cb24856965...|  43025.5599999999|
|3dd2a17168ec895c7...| 41082.59999999995|
|25c38557cf793876c...| 38907.31999999999|
|5f504b3a1c75b73d6...|37733.899999999994|
|53b36df67ebb7c415...| 37683.41999999996|
|aca2eb7d00ea1a7b8...|37608.899999999914|
|e0d64dcfaa3b6db5c...| 31786.81999999999|
|d285360f29ac7fd97...|31623.809999999987|
|7a10781637204d8d1...| 30467.49999999999|
|f1c7f353075ce59d8...|          29997.36|
|f819f0c84a64f02d3...|29024.479999999996|
|588531f8ec37e7d5f...|28291.989999999998|
|422879e10f4668299...|26577.219999999943|
|16c4e87b98a9370a9...|           25034.0|
|5a848e4ab52fd5445...| 24229.02999999997|
|a62e25e09e05e6faf...|           24051.0|
+--------------------+------------

In [60]:
from pyspark.sql.functions import col, datediff

# Hitung time delta antar tahapan
orders_with_delta = orders_df \
    .withColumn("delta_purchase_to_approved",
                datediff(col("order_approved_at"), col("order_purchase_timestamp"))) \
    .withColumn("delta_approved_to_carrier",
                datediff(col("order_delivered_carrier_date"), col("order_approved_at"))) \
    .withColumn("delta_carrier_to_customer",
                datediff(col("order_delivered_customer_date"), col("order_delivered_carrier_date"))) \
    .withColumn("delta_purchase_to_customer",
                datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp"))) \
    .withColumn("delta_purchase_to_estimated",
                datediff(col("order_estimated_delivery_date"), col("order_purchase_timestamp")))


In [61]:
# Tampilkan contoh
orders_with_delta.select(
    "order_id",
    "delta_purchase_to_approved",
    "delta_approved_to_carrier",
    "delta_carrier_to_customer",
    "delta_purchase_to_customer",
    "delta_purchase_to_estimated"
).show(10)

+--------------------+--------------------------+-------------------------+-------------------------+--------------------------+---------------------------+
|            order_id|delta_purchase_to_approved|delta_approved_to_carrier|delta_carrier_to_customer|delta_purchase_to_customer|delta_purchase_to_estimated|
+--------------------+--------------------------+-------------------------+-------------------------+--------------------------+---------------------------+
|e481f51cbdc54678b...|                         0|                        2|                        6|                         8|                         16|
|53cdb2fc8bc7dce0b...|                         2|                        0|                       12|                        14|                         20|
|47770eb9100c2d0c4...|                         0|                        0|                        9|                         9|                         27|
|949d5b44dbf5de918...|                         0|         

In [62]:
from pyspark.sql.functions import when, count

# Buat daftar kolom delta
delta_cols = [
    "delta_purchase_to_approved",
    "delta_approved_to_carrier",
    "delta_carrier_to_customer",
    "delta_purchase_to_customer",
    "delta_purchase_to_estimated"
]

# Cek apakah ada yang NULL atau MINUS
for col_name in delta_cols:
    print(f"\n=== {col_name} ===")
    orders_with_delta.select(
        count(when(col(col_name).isNull(), col_name)).alias("null_count"),
        count(when(col(col_name) < 0, col_name)).alias("negative_count")
    ).show()



=== delta_purchase_to_approved ===
+----------+--------------+
|null_count|negative_count|
+----------+--------------+
|       160|             0|
+----------+--------------+


=== delta_approved_to_carrier ===
+----------+--------------+
|null_count|negative_count|
+----------+--------------+
|      1797|           680|
+----------+--------------+


=== delta_carrier_to_customer ===
+----------+--------------+
|null_count|negative_count|
+----------+--------------+
|      2966|            20|
+----------+--------------+


=== delta_purchase_to_customer ===
+----------+--------------+
|null_count|negative_count|
+----------+--------------+
|      2965|             0|
+----------+--------------+


=== delta_purchase_to_estimated ===
+----------+--------------+
|null_count|negative_count|
+----------+--------------+
|         0|             0|
+----------+--------------+



# Data Cleaning and Transformation

##### Handling Missing Value

In [63]:
# Replace Null with ''
cols_to_fill = ["review_comment_title", "review_comment_message"]
reviews_cleaned = reviews_df.fillna('', subset=cols_to_fill)

In [64]:
null_count_report(reviews_cleaned).show(truncate=False)

+-----------------------+----------+
|column_name            |null_count|
+-----------------------+----------+
|review_id              |0         |
|order_id               |0         |
|review_score           |0         |
|review_comment_title   |0         |
|review_comment_message |0         |
|review_creation_date   |0         |
|review_answer_timestamp|0         |
+-----------------------+----------+



In [65]:
# Calculate the average value per category
category_avg = products_df.groupBy("product_category_name").agg(
    avg("product_weight_g").alias("avg_weight"),
    avg("product_length_cm").alias("avg_length"),
    avg("product_height_cm").alias("avg_height"),
    avg("product_width_cm").alias("avg_width")
)

# Calculate the global average for fallback
global_avg = products_df.agg(
    avg("product_weight_g").alias("global_weight"),
    avg("product_length_cm").alias("global_length"),
    avg("product_height_cm").alias("global_height"),
    avg("product_width_cm").alias("global_width")
).collect()[0]

In [66]:
products_cleaned = (
    products_df.alias("p")
    .join(category_avg.alias("c"), on="product_category_name", how="left")
    .withColumn(
        "product_weight_g",
        when(col("p.product_weight_g").isNull(), col("c.avg_weight"))
        .otherwise(col("p.product_weight_g"))
    )
    .withColumn(
        "product_length_cm",
        when(col("p.product_length_cm").isNull(), col("c.avg_length"))
        .otherwise(col("p.product_length_cm"))
    )
    .withColumn(
        "product_height_cm",
        when(col("p.product_height_cm").isNull(), col("c.avg_height"))
        .otherwise(col("p.product_height_cm"))
    )
    .withColumn(
        "product_width_cm",
        when(col("p.product_width_cm").isNull(), col("c.avg_width"))
        .otherwise(col("p.product_width_cm"))
    )
)

In [67]:
products_cleaned = (
    products_cleaned
    .fillna({
        "product_weight_g": global_avg["global_weight"],
        "product_length_cm": global_avg["global_length"],
        "product_height_cm": global_avg["global_height"],
        "product_width_cm": global_avg["global_width"]
    })
)

In [68]:
# cek apakah masih ada null
products_cleaned.select(
    [col(c).isNull().cast("int").alias(c) for c in ["product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"]]
).agg(
    *[sum(col(c)).alias(c) for c in ["product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm"]]
).show()

+----------------+-----------------+-----------------+----------------+
|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------------+-----------------+-----------------+----------------+
|               0|                0|                0|               0|
+----------------+-----------------+-----------------+----------------+



##### Remove Duplicate Records

In [69]:
# Drop duplicates based on a combination of 2 columns
geolocation_cleaned = geolocation_df.dropDuplicates(["geolocation_zip_code_prefix", "geolocation_lat","geolocation_lng"])

print(f"Number of rows before: {geolocation_df.count()}")
print(f"Number of rows after: {geolocation_cleaned.count()}")

Number of rows before: 1000163
Number of rows after: 720154


#### Standardizing the format

In [70]:
# Derive date columns from timestamp for daily aggregation
orders_df = (
    orders_df
    .withColumn("order_purchase_date", to_date(col("order_purchase_timestamp")))
    .withColumn("order_delivered_date", to_date(col("order_delivered_customer_date")))
    .withColumn("order_estimated_delivery_date", to_date(col("order_estimated_delivery_date")))
)

In [71]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)
 |-- order_purchase_date: date (nullable = true)
 |-- order_delivered_date: date (nullable = true)



In [72]:
payments_df.select("payment_type").distinct().show()

+------------+
|payment_type|
+------------+
|      boleto|
| not_defined|
| credit_card|
|     voucher|
|  debit_card|
+------------+



In [73]:
payments_cleaned = payments_df. \
    withColumn(
        'payment_type',
        when(col('payment_type')=='boleto','Bank Transfer')
        .when(col('payment_type')=='credit_card','Credit Card')
        .when(col('payment_type')=='debit_card','Debit Card')
        .otherwise('other')
    )

In [74]:
payments_cleaned.select("payment_type").distinct().show()

+-------------+
| payment_type|
+-------------+
|  Credit Card|
|Bank Transfer|
|        other|
|   Debit Card|
+-------------+



In [75]:
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [76]:
#Change data type to string
customers_cleaned = customers_df.withColumn(
    "customer_zip_code_prefix", col("customer_zip_code_prefix").cast("string")
)

geolocation_cleaned = geolocation_df.withColumn(
    "geolocation_zip_code_prefix", col("geolocation_zip_code_prefix").cast("string")
)

seller_cleaned = sellers_df.withColumn(
    "seller_zip_code_prefix", col("seller_zip_code_prefix").cast("string")
)

#### Data Transformation

In [77]:
order_with_details = orders_df.join(order_items_df,'order_id','left')\
.join(payments_cleaned,'order_id','left')\
.join(customers_cleaned,'customer_id','left')

In [78]:
order_with_details.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)
 |-- order_purchase_date: date (nullable = true)
 |-- order_delivered_date: date (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- customer_

In [79]:
order_with_total_value = order_with_details.groupBy('order_id')\
.agg(sum('payment_value').alias('total_order_value'))

In [80]:
order_with_total_value.show()

+--------------------+------------------+
|            order_id| total_order_value|
+--------------------+------------------+
|f373335aac9a659de...|             59.18|
|118045506e1c1dda0...|            1802.0|
|cc66dee6fbc18bb79...|             136.4|
|f44cb69655f8e4d13...|            164.32|
|edcc6b79e8394346b...|            162.63|
|9f98d6530155e3b38...|            316.76|
|5e57ff5e1c008db89...|            348.44|
|0957ed870116e596b...|            157.99|
|3fa59277573f0fe06...|             91.05|
|d5f812041d8fc446c...|             72.29|
|24012690fe6562f4a...|            144.29|
|85be7c94bcd3f908f...|             72.75|
|56ef80c564f6fd57c...|             49.42|
|7a70b827ebc6ab85b...|1950.3899999999999|
|8e10a1d1a57b6a469...|            254.22|
|107478e48c13dc0b3...|             84.24|
|949280c70c6d62ec9...|             49.42|
|6a276c227b7bb9659...|            197.41|
|0fe9c7ad9288ff24b...|             44.24|
|03ebfa9712b7dbc70...|             55.78|
+--------------------+------------

In [81]:
order_with_total_value = order_with_total_value.withColumn(
    "total_order_value", round(col("total_order_value"), 2)
)

In [82]:
delivery_time_df = order_with_details \
    .filter(col("order_delivered_customer_date").isNotNull()) \
    .select(
        col("order_id"),
        col("customer_id"),
        col("order_purchase_timestamp"),
        col("order_delivered_customer_date"),
        col("order_estimated_delivery_date"),
        datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp")).alias("actual_delivery_days"),
        datediff(col("order_delivered_customer_date"), col("order_estimated_delivery_date")).alias("delivery_delay_days")
    )

In [83]:
delivery_time_df.show(5)

+--------------------+--------------------+------------------------+-----------------------------+-----------------------------+--------------------+-------------------+
|            order_id|         customer_id|order_purchase_timestamp|order_delivered_customer_date|order_estimated_delivery_date|actual_delivery_days|delivery_delay_days|
+--------------------+--------------------+------------------------+-----------------------------+-----------------------------+--------------------+-------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|                   2017-10-18|                   8|                 -8|
|e481f51cbdc54678b...|9ef432eb625129730...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|                   2017-10-18|                   8|                 -8|
|e481f51cbdc54678b...|9ef432eb625129730...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|                   2017-10-18|                   8| 

In [84]:
order_items_df.select('price').summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            112650|
|   mean|120.65373901464986|
| stddev|183.63392805025975|
|    min|              0.85|
|    25%|              39.9|
|    50%|             74.99|
|    75%|             134.9|
|    max|            6735.0|
+-------+------------------+



In [85]:
products_cleaned = products_cleaned.withColumn(
'product_size_category',
when(col('product_weight_g') <500,'Small')
.when(col('product_weight_g').between(500,2000),'Medium')
.otherwise('Large')
)

In [86]:
products_cleaned.show()

+---------------------+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+------------------+------------------+------------------+---------------------+
|product_category_name|          product_id|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|        avg_weight|        avg_length|        avg_height|         avg_width|product_size_category|
+---------------------+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+------------------+------------------+------------------+------------------+---------------------+
|           perfumaria|1e9e8ef04dbcff454...|                 40|                       287|                 1|           225.0|             16.0|             10.0|

In [87]:
# !hdfs dfs -mkdir /user/winardi/data/olist_proc/

In [88]:
!hdfs dfs -rm -r /user/winardi/data/olist_proc/order_with_details.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/product_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/geolocation_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/reviews_cleaned.parquet

Deleted /user/winardi/data/olist_proc/order_with_details.parquet
Deleted /user/winardi/data/olist_proc/product_cleaned.parquet
Deleted /user/winardi/data/olist_proc/geolocation_cleaned.parquet
Deleted /user/winardi/data/olist_proc/reviews_cleaned.parquet


In [89]:
!hdfs dfs -rm -r /user/winardi/data/olist_proc/customers_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/orders_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/order_items_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/sellers_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/category_translation_cleaned.parquet
!hdfs dfs -rm -r /user/winardi/data/olist_proc/payments_cleaned.parquet

Deleted /user/winardi/data/olist_proc/customers_cleaned.parquet
Deleted /user/winardi/data/olist_proc/orders_cleaned.parquet
Deleted /user/winardi/data/olist_proc/order_items_cleaned.parquet
Deleted /user/winardi/data/olist_proc/sellers_cleaned.parquet
Deleted /user/winardi/data/olist_proc/category_translation_cleaned.parquet
Deleted /user/winardi/data/olist_proc/payments_cleaned.parquet


In [90]:
customers_df.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/customers_cleaned.parquet')
orders_df.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/orders_cleaned.parquet')
order_items_df.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/order_items_cleaned.parquet')
seller_cleaned.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/sellers_cleaned.parquet')
category_translation_df.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/category_translation_cleaned.parquet')

In [91]:
order_with_details.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/order_with_details.parquet')

In [92]:
products_cleaned.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/product_cleaned.parquet')

In [93]:
geolocation_cleaned.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/geolocation_cleaned.parquet')

In [94]:
reviews_cleaned.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/reviews_cleaned.parquet')

In [95]:
payments_cleaned.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist_proc/payments_cleaned.parquet')

# Data Integration and Aggregation

In [96]:
cleaned_path = 'hdfs://localhost:9000/user/winardi/data/olist_proc/'

In [97]:
order_with_detail_df = spark.read.parquet(cleaned_path + 'order_with_details.parquet')
customers_cleaned = spark.read.parquet(cleaned_path + 'customers_cleaned.parquet')
orders_cleaned = spark.read.parquet(cleaned_path + 'orders_cleaned.parquet')
order_items_cleaned = spark.read.parquet(cleaned_path + 'order_items_cleaned.parquet')
sellers_cleaned = spark.read.parquet(cleaned_path + 'sellers_cleaned.parquet')
category_translation_cleaned = spark.read.parquet(cleaned_path + 'category_translation_cleaned.parquet')
payments_cleaned = spark.read.parquet(cleaned_path + 'payments_cleaned.parquet')
products_cleaned = spark.read.parquet(cleaned_path + 'product_cleaned.parquet')
reviews_cleaned = spark.read.parquet(cleaned_path + 'reviews_cleaned.parquet')
geolocation_cleaned = spark.read.parquet(cleaned_path + 'geolocation_cleaned.parquet')

In [98]:
orders_items_products_df = order_with_detail_df.join(products_cleaned,'product_id','inner')

In [99]:
orders_items_products_sellers_df = orders_items_products_df.join(sellers_cleaned,'seller_id','inner')

In [100]:
full_orders_df = orders_items_products_sellers_df.join(reviews_cleaned,'order_id','left')

In [101]:
# GEolocation Data
full_orders_df = full_orders_df.join(geolocation_cleaned,full_orders_df.customer_zip_code_prefix == geolocation_cleaned.geolocation_zip_code_prefix, 'left')

In [102]:
## product_category_name
full_orders_df = full_orders_df.join(category_translation_cleaned, 'product_category_name', 'left')
full_orders_df = full_orders_df.drop('product_category_name')
full_orders_df = full_orders_df.withColumnRenamed('product_category_name_english', 'product_category_name')

In [103]:
full_orders_df.cache()

DataFrame[order_id: string, seller_id: string, product_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: date, order_purchase_date: date, order_delivered_date: date, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, payment_sequential: int, payment_type: string, payment_installments: int, payment_value: double, customer_unique_id: string, customer_zip_code_prefix: string, customer_city: string, customer_state: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: double, product_length_cm: double, product_height_cm: double, product_width_cm: double, avg_weight: double, avg_length: double, avg_height: double, avg_width: double, product_size_category: string, seller_zip_code_prefix: string, seller_city: string,

In [104]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)
 |-- order_purchase_date: date (nullable = true)
 |-- order_delivered_date: date (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- customer_

In [105]:
# Total Revenues Per Seller
seller_revenue_df = full_orders_df.groupBy('seller_id').agg(sum('price').alias('amount'))

In [106]:
seller_revenue_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- amount: double (nullable = true)



In [107]:
seller_revenue_df.show(5)

+--------------------+------------------+
|           seller_id|            amount|
+--------------------+------------------+
|8e6cc767478edae94...| 1145757.400000001|
|ec8879960bd2221d5...| 732805.4000000019|
|7aa4334be125fcdd2...|2509294.4899999965|
|71039d19d4303bf90...| 579490.4999999986|
|ef0ace09169ac0905...| 2261367.700000003|
+--------------------+------------------+
only showing top 5 rows



In [108]:
seller_revenue_df = seller_revenue_df.withColumn(
    "amount", round(col("amount"), 2)
)

In [109]:
seller_revenue_df.show(5)

+--------------------+----------+
|           seller_id|    amount|
+--------------------+----------+
|8e6cc767478edae94...| 1145757.4|
|ec8879960bd2221d5...|  732805.4|
|7aa4334be125fcdd2...|2509294.49|
|71039d19d4303bf90...|  579490.5|
|ef0ace09169ac0905...| 2261367.7|
+--------------------+----------+
only showing top 5 rows



In [110]:
# Total Orders Per Custoemr
customer_order_count = full_orders_df.groupBy('customer_id')\
.agg(count('order_id').alias('total_orders'))\
.orderBy(desc('total_orders'))

In [111]:
customer_order_count.show(10)

+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|351e40989da90e704...|       11427|
|50920f8cd0681fd86...|       10752|
|9b43e2a62de9bab3a...|        8556|
|270c23a11d024a44c...|        8001|
|5c87184371002d49e...|        6876|
|d3e82ccec3cb5f956...|        6876|
|d5f2b3f597c7ccafb...|        6706|
|c2f18647725395af4...|        6612|
|24e7dc2ff8c071263...|        6597|
|7bb57d182bdc11653...|        6258|
+--------------------+------------+
only showing top 10 rows



In [112]:
# Average Review Score Per Seller
agg_seller_review_score = full_orders_df.groupBy('seller_id')\
.agg(avg('review_score').alias('seller_review_score'))\
.orderBy(desc('seller_review_score'))

In [113]:
agg_seller_review_score.show(10)

+--------------------+-------------------+
|           seller_id|seller_review_score|
+--------------------+-------------------+
|5721089ba9214e800...|                5.0|
|4049512728d969bec...|                5.0|
|41e0fa5761c886a63...|                5.0|
|5d505d2298ad549e4...|                5.0|
|a20d8058c866dbaec...|                5.0|
|7dc9a3355bae96dd8...|                5.0|
|0b46f784306be7200...|                5.0|
|f8f35af4634605e66...|                5.0|
|cd233f8bfa30ebfd6...|                5.0|
|83d458c0d6d4f9429...|                5.0|
+--------------------+-------------------+
only showing top 10 rows



In [114]:
# Most Sold Products ( Top 10 )
top_products_df = full_orders_df.groupBy("product_id")\
    .agg(count("order_id").alias("total_sold"))\
    .orderBy(desc("total_sold"))\
    .limit(10)   

In [115]:
top_products_df.show()

+--------------------+----------+
|          product_id|total_sold|
+--------------------+----------+
|aca2eb7d00ea1a7b8...|     86740|
|422879e10f4668299...|     81110|
|99a4788cb24856965...|     78775|
|389d119b48cf3043d...|     60248|
|d1c427060a0f73f6b...|     59274|
|368c6c730842d7801...|     58358|
|53759a2ecddad2bb8...|     52654|
|53b36df67ebb7c415...|     52105|
|154e7e31ebfa09220...|     42700|
|3dd2a17168ec895c7...|     40787|
+--------------------+----------+



In [116]:
# Top Customers By Spending
top_customers_spending = full_orders_df.groupBy('customer_id')\
.agg(sum('price').alias('customers_spending'))\
.orderBy(desc('customers_spending'))

In [117]:
top_customers_spending.show(10)

+--------------------+------------------+
|         customer_id|customers_spending|
+--------------------+------------------+
|d3e82ccec3cb5f956...|         6662844.0|
|df55c14d1476a9a34...|         3565657.0|
|fe5113a38e3575c04...|         3293604.0|
|ec5b2ba62e5743423...|         2556120.0|
|63b964e79dee32a35...|         2501664.0|
|46bb3c0b1a65c8399...|         2336752.0|
|05455dfa7cd02f13d...| 2160194.400000087|
|3690e975641f01bd0...|         2124498.0|
|349509b216bd5ec11...|         1923627.0|
|695476b5848d64ba0...|1820543.1299999943|
+--------------------+------------------+
only showing top 10 rows



In [118]:
top_customers_spending = top_customers_spending.withColumn(
    "customers_spending", round(col("customers_spending"), 2)
)

#####  Window Function and Ranking

In [119]:
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [120]:
# Rank Top Selling Products Per seller
top_seller_products_df = full_orders_df.withColumn('rank',rank().over(window_spec)).filter(col('rank')<=5)
top_seller_products_df.select('seller_id','price','rank').show(truncate=False)

+--------------------------------+-----+----+
|seller_id                       |price|rank|
+--------------------------------+-----+----+
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|895.0|1   |
|0015a82c2db000af6aaaf3ae2ecb0532|

In [121]:
# Dense Rank for Sellers Based on Revenue
window_spec = Window.orderBy(desc('total_revenue'))

# Calculate total revenue per seller and apply dense rank
seller_revenue_rank_df = full_orders_df.groupBy('seller_id') \
    .agg(sum('price').alias('total_revenue')) \
    .withColumn('dense_rank', dense_rank().over(window_spec)) \
    .orderBy('dense_rank')

# Show results
seller_revenue_rank_df.show(truncate=False)

+--------------------------------+--------------------+----------+
|seller_id                       |total_revenue       |dense_rank|
+--------------------------------+--------------------+----------+
|4869f7a5dfa277a7dca6462dcf3b52b2|3.613871732000011E7 |1         |
|53243585a1d6dc2643021fd1853d8905|3.4291592950000085E7|2         |
|4a3ca9315b744ce9f8e9374361493884|3.3759570840000235E7|3         |
|7c67e1448b00f6e969d365cea6b010ab|3.22823217899995E7  |4         |
|fa1c13f2614d7b5c4749cbc52fecda94|3.013938631000002E7 |5         |
|da8622b14eb17ae2831f4ac5b9dab84a|2.985766972999996E7 |6         |
|7e93a43ef30c4f03f38b393420bc753a|2.6315706299999934E7|7         |
|1025f0e2d44d7041d6cf58b6550e0bfa|2.2937518519999977E7|8         |
|46dc3b2cc0980fb8ec44634e21d2718e|2.179177328999985E7 |9         |
|955fee9216a65b617aa5c0531780ce60|2.096441067000001E7 |10        |
|7a67c85e85bb2ce8582c35f2203ad736|2.0312794889999878E7|11        |
|620c87c171fb2a6dd6e8bb4dec959fc6|2.011983960000009E7 |12     

In [122]:
# Total Revenue & Average Order Value (AOV) per Customer
customer_spending_df = full_orders_df.groupBy('customer_id')\
.agg(count('order_id').alias('total_orders'),sum('price').alias('total_spent'),round(avg('price'),2).alias('AOV'))\
.orderBy(desc('total_spent'))
customer_spending_df.show()

+--------------------+------------+------------------+-------+
|         customer_id|total_orders|       total_spent|    AOV|
+--------------------+------------+------------------+-------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|
|695476b5848d64ba0...|         687|1820543.1299999943|2649.99|
|73236a0796f53d60d...|         832|         1755520.0| 2110.0|
|cc803a2c412833101...|         762|         1676400.0| 2200.0|
|1ff773612ab8934db...|        5820|1658641.7999999512| 

In [123]:
# Seller Performance Metrics ( Revenue, Average Review, Order Count)
seller_performance_df = full_orders_df.groupBy('seller_id') \
.agg(count('order_id').alias('total_orders'),
sum('price').alias('total_revenue'),
round(avg('review_score'),2).alias('avg_review_score'),
round(stddev('price'),2).alias('price_variability')
)\
.orderBy(desc('total_revenue'))

seller_performance_df.show()

+--------------------+------------+--------------------+----------------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review_score|price_variability|
+--------------------+------------+--------------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.613871732000011E7|            4.09|           111.65|
|53243585a1d6dc264...|       54514|3.4291592950000085E7|            4.12|           499.65|
|4a3ca9315b744ce9f...|      330661|3.3759570840000235E7|            3.77|            59.37|
|7c67e1448b00f6e96...|      233306|  3.22823217899995E7|            3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686| 3.013938631000002E7|            4.38|            307.7|
|da8622b14eb17ae28...|      264433| 2.985766972999996E7|            3.98|            72.92|
|7e93a43ef30c4f03f...|       50226|2.6315706299999934E7|            4.15|           377.24|
|1025f0e2d44d7041d...|      229587|2.2937518519999977E7|            3.89|       

In [124]:
# Product Popularity Metrics
product_metrics_df = full_orders_df.groupBy('product_id')\
.agg(
count('order_id').alias('total_sales'),
sum('price').alias('total_revenue'),
round(avg('price'),2).alias('avg_price'),
round(stddev('price'),2).alias('price_volatility'),\
collect_set('seller_id').alias('unique_sellers')
)\
.orderBy(desc('total_sales'))

product_metrics_df.show()

+--------------------+-----------+------------------+---------+----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+-----------+------------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740| 6164630.300000014|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.510000015|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775| 6921762.710000007|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248| 3280533.129999999|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.329999986|   138.68|           16.58|[a1043bafd471dff5...|
|368c6c730842d7801...|      58358|3181698.8999999957|    54.52|            4.59|[1f50f920176fa81d...|
|53759a2ecddad2bb8...|      52654| 2893017.500000002|    54.94|            4.52|[1

In [125]:
window_spec = Window.orderBy('purchase_year', 'purchase_month')

monthly_trend_growth_df = full_orders_df \
    .groupBy(
        year('order_purchase_timestamp').alias('purchase_year'),
        month('order_purchase_timestamp').alias('purchase_month')
    ) \
    .agg(
        count('order_id').alias('total_orders'),
        round(sum('price'), 2).alias('total_revenue'),
        round(avg('price'), 2).alias('avg_order_value')
    ) \
    .orderBy('purchase_year', 'purchase_month') \
    .withColumn('prev_month_revenue', lag('total_revenue').over(window_spec)) \
    .withColumn('revenue_growth', 
                when(isnull(col('total_revenue') - col('prev_month_revenue')), 0)
                .otherwise(round(((col('total_revenue') - col('prev_month_revenue')) / col('prev_month_revenue')) * 100, 2))) \
    .withColumn('prev_month_orders', lag('total_orders').over(window_spec)) \
    .withColumn('order_growth', 
                when(isnull(col('total_orders') - col('prev_month_orders')), 0)
                .otherwise(round(((col('total_orders') - col('prev_month_orders')) / col('prev_month_orders')) * 100, 2))) \
    .select('purchase_year', 'purchase_month', 'total_orders', 'total_revenue', 
            'avg_order_value', 'revenue_growth', 'order_growth')

print("=== Monthly Trend with Growth Rates ===")
monthly_trend_growth_df.show(truncate=False)

=== Monthly Trend with Growth Rates ===
+-------------+--------------+------------+--------------+---------------+--------------+------------+
|purchase_year|purchase_month|total_orders|total_revenue |avg_order_value|revenue_growth|order_growth|
+-------------+--------------+------------+--------------+---------------+--------------+------------+
|2016         |9             |1268        |57431.0       |45.29          |0.0           |0.0         |
|2016         |10            |62812       |8301215.43    |132.16         |14354.24      |4853.63     |
|2016         |12            |304         |3313.6        |10.9           |-99.96        |-99.52      |
|2017         |1             |154814      |1.749501938E7 |113.01         |527876.2      |50825.66    |
|2017         |2             |314901      |3.771518492E7 |119.77         |115.58        |103.41      |
|2017         |3             |493268      |6.410433591E7 |129.96         |69.97         |56.64       |
|2017         |4             |428

In [126]:
# Customer Retention Analysis ( First & Last Order )
customer_retention_df = full_orders_df.groupBy('customer_id')\
.agg(
first('order_purchase_timestamp').alias('first_order_date'),
last('order_purchase_timestamp').alias('last_order_date'),
count('order_id').alias('total_orders'),
round(avg('price'),2).alias('aov')
)\
.orderBy(desc('total_orders'))

In [127]:
customer_retention_df.show()

+--------------------+-------------------+-------------------+------------+------+
|         customer_id|   first_order_date|    last_order_date|total_orders|   aov|
+--------------------+-------------------+-------------------+------------+------+
|351e40989da90e704...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427| 85.99|
|50920f8cd0681fd86...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752| 43.82|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556|  26.4|
|270c23a11d024a44c...|2017-08-08 20:26:31|2017-08-08 20:26:31|        8001| 36.59|
|5c87184371002d49e...|2018-01-05 19:15:37|2018-01-05 19:15:37|        6876| 12.49|
|d3e82ccec3cb5f956...|2017-03-18 14:28:34|2017-03-18 14:28:34|        6876| 969.0|
|d5f2b3f597c7ccafb...|2017-12-13 14:21:15|2017-12-13 14:21:15|        6706|  59.0|
|c2f18647725395af4...|2018-03-06 19:21:47|2018-03-06 19:21:47|        6612|  34.9|
|24e7dc2ff8c071263...|2017-11-24 16:16:45|2017-11-24 16:16:45|        6597|  59.2|
|7bb

In [128]:
# HW - Correct the last_order_date
customer_segmentation_df = full_orders_df.groupBy('customer_id') \
    .agg(max('order_purchase_timestamp').alias('last_order_date')) \
    .withColumn('days_since_last_order', 
                datediff(current_date(), col('last_order_date'))) \
    .withColumn('customer_segment',
                when(col('days_since_last_order') <= 30, 'Active')
                .when(col('days_since_last_order') <= 90, 'At Risk')
                .when(col('days_since_last_order') <= 180, 'Churning')
                .otherwise('Lost')) \
    .orderBy(col('last_order_date').desc())

print("=== Customer Segmentation by Last Order Date ===")
customer_segmentation_df.show(truncate=False)

=== Customer Segmentation by Last Order Date ===
+--------------------------------+-------------------+---------------------+----------------+
|customer_id                     |last_order_date    |days_since_last_order|customer_segment|
+--------------------------------+-------------------+---------------------+----------------+
|4b7decb9b58e2569548b8b4c8e20e8d7|2018-09-03 09:06:57|2585                 |Lost            |
|898b7fee99c4e42170ab69ba59be0a8b|2018-08-29 15:00:37|2590                 |Lost            |
|496630b6740bcca28fce9ba50d8a26ef|2018-08-29 14:52:00|2590                 |Lost            |
|6e353700bc7bcdf6ebc15d6de16d7002|2018-08-29 14:18:28|2590                 |Lost            |
|e60df9449653a95af4549bbfcb18a6eb|2018-08-29 14:18:23|2590                 |Lost            |
|e450a297a7bc6839ceb0cf1a2377fa02|2018-08-29 12:25:59|2590                 |Lost            |
|56b1ac2855cc6d7950b4ffa6a9b41b0d|2018-08-29 11:06:11|2590                 |Lost            |
|10a79ef278

In [129]:
# Customer Segmentation based on spending
customer_spending_df = customer_spending_df.withColumn(
'customer_segment',
when(col('AOV') >=1200,"High-Value")
.when( (col('AOV')<1200) & (col('AOV') >=700),'Medium_Value')
.otherwise('Low-Value'))

customer_spending_df.show()

+--------------------+------------+------------------+-------+----------------+
|         customer_id|total_orders|       total_spent|    AOV|customer_segment|
+--------------------+------------+------------------+-------+----------------+
|d3e82ccec3cb5f956...|        6876|         6662844.0|  969.0|    Medium_Value|
|df55c14d1476a9a34...|         743|         3565657.0| 4799.0|      High-Value|
|fe5113a38e3575c04...|        2292|         3293604.0| 1437.0|      High-Value|
|ec5b2ba62e5743423...|        1428|         2556120.0| 1790.0|      High-Value|
|63b964e79dee32a35...|        6072|         2501664.0|  412.0|       Low-Value|
|46bb3c0b1a65c8399...|         748|         2336752.0| 3124.0|      High-Value|
|05455dfa7cd02f13d...|        2184| 2160194.400000087|  989.1|    Medium_Value|
|3690e975641f01bd0...|         802|         2124498.0| 2649.0|      High-Value|
|349509b216bd5ec11...|         743|         1923627.0| 2589.0|      High-Value|
|695476b5848d64ba0...|         687|18205

In [130]:
full_orders_df = full_orders_df.join(customer_spending_df.select('customer_id','customer_segment'),'customer_id',how='left')

In [131]:
full_orders_df.select('customer_id','customer_segment').show()

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
|8cc82cbc81afbff38...|       Low-Value|
+--------------------+----------------+
only showing top 20 rows



In [132]:
full_orders_df.select('order_purchase_timestamp').show()

+------------------------+
|order_purchase_timestamp|
+------------------------+
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
|     2018-03-13 15:43:40|
+------------------------+
only showing top 20 rows



In [133]:
#Hourly Order Distribution
full_orders_df = full_orders_df.withColumn('hour_of_day',expr('hour(order_purchase_timestamp)'))

In [134]:
full_orders_df.select('order_purchase_timestamp','hour_of_day').show()

+------------------------+-----------+
|order_purchase_timestamp|hour_of_day|
+------------------------+-----------+
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
|     2018-03-13 15:43:40|         15|
+------------------------+-----------+
only showing top 20 rows



In [135]:
# Weekday vs Weekend Order
full_orders_df = full_orders_df.withColumn('order_day_type',
    when(dayofweek('order_purchase_timestamp').isin(1, 7), lit('Weekend'))
    .otherwise(lit('Weekday'))
)

full_orders_df.select('order_purchase_timestamp','order_day_type').show()

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
|     2018-03-13 15:43:40|       Weekday|
+------------------------+--------

In [136]:
!hdfs dfs -rm -r /user/winardi/data/olist/processed/

Deleted /user/winardi/data/olist/processed


In [137]:
!hdfs dfs -ls -h /user/winardi/data/olist/processed/

ls: `/user/winardi/data/olist/processed/': No such file or directory


In [138]:
!hdfs dfs -mkdir /user/winardi/data/olist/processed/

In [139]:
 full_orders_df.write.mode('overwrite').parquet('hdfs://localhost:9000/user/winardi/data/olist/processed/')