## Data Cleaning & Transformantion

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
.appName("brazilian-ecommerce") \
.config("spark.executor.memory","2g") \
.config("spark.executor.cores", "2") \
.config("spark.executor.instances", "2") \
.config("spark.sql.autoBroadcastJoinThreshold",20*1024*1024) \
.getOrCreate()

25/06/14 08:53:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
path_head = "gs://bucket-name/ecommerce_real/"

df_customers = spark.read.parquet(f"{path_head}olist_customers", header=True)
df_geolocation = spark.read.parquet(f"{path_head}olist_geolocation", header=True)
df_order_items = spark.read.parquet(f"{path_head}olist_order_items", header=True)
df_order_payments = spark.read.parquet(f"{path_head}olist_order_payments", header=True)
df_order_reviews = spark.read.parquet(f"{path_head}olist_order_reviews", header=True)
df_orders = spark.read.parquet(f"{path_head}olist_orders", header=True)
df_products = spark.read.parquet(f"{path_head}olist_products", header=True)
df_sellers = spark.read.parquet(f"{path_head}olist_sellers", header=True)

                                                                                

In [4]:
df_cat_trans = spark.read.csv(f"{path_head}product_category_name_translation.csv", header=True)

                                                                                

In [5]:
def missing_values(df, name):
    print(f"Missing values in {name}:")
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show()

### Customers

In [None]:
# Change string format of city from lower to title
df_customers = df_customers.withColumn("customer_city", initcap(col('customer_city')))
df_customers.show(5)

In [None]:
df_customers = df_customers.withColumn("customer_zip_code_prefix", col('customer_zip_code_prefix').cast('string'))

In [None]:
df_customers.printSchema()

In [None]:
# write clean customers table to parquet file
df_customers.write.mode("overwrite").parquet(f"{path_head}cleaned/customers/")

### Sellers

In [None]:
# Change string format of city from lower to title
df_sellers = df_sellers.withColumn("seller_city", initcap(col('seller_city')))
df_sellers.show(5)

In [None]:
df_sellers = df_sellers.withColumn("seller_zip_code_prefix", col('seller_zip_code_prefix').cast('string'))
df_sellers.printSchema()

In [None]:
# write cleaned table to parquet file
df_sellers.write.mode("overwrite").parquet(f"{path_head}cleaned/sellers/")

### Products

In [None]:
# rename some columns name
df_products = df_products \
    .withColumnRenamed("product_name_lenght", "product_name_length") \
    .withColumnRenamed("product_description_lenght", "product_description_length")

In [None]:
# filter all row that has missing value
anomali_products_df = df_products.filter(
    col("product_category_name").isNull() |
    col("product_name_length").isNull() |
    col("product_description_length").isNull() |
    col("product_photos_qty").isNull() |
    col("product_weight_g").isNull() |
    col("product_length_cm").isNull() |
    col("product_height_cm").isNull() |
    col("product_width_cm").isNull()
)

# save it into parquet
anomali_products_df.write.mode("overwrite").parquet(f"{path_head}anomalies/anomaly_products/")

In [None]:
df_products = df_products.na.drop(subset=[
    "product_category_name",
    "product_name_length",
    "product_description_length",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
])

# check missing values
df_products.select([count(when(col(c).isNull(), 1)).alias(c) for c in df_products.columns]).show()

The folder .../anomalies/anomaly_products/ contains rows that might be corrupted.

Total rows 611, where there are 609 rows of [product_category_name, product_name_length, product_description_length] that are NULL and 1 row of [product_weight_g, product_length_cm, product_height_cm, product_width_cm] that are NULL, and 1 row where all columns (except id) are NULL

In [None]:
# map category english name into category original name
df_products = df_products.join(df_cat_trans, "product_category_name", "left")

In [None]:
# Two categories are not available in df_cat_trans,
# so that it is translated manually based on context understanding:
# 'pc_gamer' => 'computers' (since there is computers category in the existed file)
# 'portateis_cozinha_e_preparadores_de_alimentos' => 'portable_kitchen_food_preparators'

df_products = df_products.withColumn(
    'product_category_name_english',
    when(
        col('product_category_name') == 'pc_gamer', 'computers'
    ).when(
        col('product_category_name') == 'portateis_cozinha_e_preparadores_de_alimentos', 'portable_kitchen_food_preparators'
    ).otherwise(col('product_category_name_english'))
)

In [None]:
df_products = df_products.select(
    'product_id', 'product_category_name',
    'product_category_name_english', 'product_name_length',
    'product_description_length', 'product_photos_qty',
    'product_weight_g', 'product_length_cm',
    'product_height_cm', 'product_width_cm'
)

In [None]:
df_products = df_products \
.withColumn("product_weight_kg", col("product_weight_g") / 1000) \
.withColumn("product_volume_cm3", col("product_length_cm")*col("product_width_cm")*col("product_height_cm")) \
.select(
    'product_id', 'product_category_name',
    'product_category_name_english', 'product_name_length',
    'product_description_length', 'product_photos_qty',
    'product_weight_g', 'product_weight_kg', 'product_length_cm',
    'product_height_cm', 'product_width_cm', 'product_volume_cm3'
)

In [None]:
df_products = df_products.withColumn('product_size_category', 
                      when(col('product_weight_kg') < 1, 'Small')
                      .when(col('product_weight_kg').between(1,5), 'Medium')
                      .otherwise('Large'))

In [None]:
df_products.agg(countDistinct("product_id"), count("*")).show()

In [None]:
# write cleaned table to parquet file
df_products.write.mode("overwrite").parquet(f"{path_head}cleaned/products/")

### Geolocation

In [None]:
# Change string format of city from lower to title
df_geolocation = df_geolocation.withColumn("geolocation_city", initcap(col('geolocation_city')))
df_geolocation.show(5)

In [None]:
# ambil rata-rata lat long sehingga zip code prefix tidak duplikat
df_geolocation = df_geolocation.groupBy("geolocation_zip_code_prefix").agg(
    avg("geolocation_lat").alias("geolocation_lat"),
    avg("geolocation_lng").alias("geolocation_lng"),
    first("geolocation_city").alias("geolocation_city"),
    first("geolocation_state").alias("geolocation_state")
)

df_geolocation.count()

In [None]:
# check duplikat
df_geolocation.groupBy("geolocation_zip_code_prefix").count().filter("count > 1").count()

In [None]:
# write cleaned table to parquet file
df_geolocation.write.mode("overwrite").parquet(f"{path_head}cleaned/geolocation/")

### Orders

In [None]:
df_orders = df_orders.select(
    'order_id', 'customer_id',
    'order_status',
    'order_purchase_timestamp', 'order_approved_at',
    'order_delivered_carrier_date', 'order_delivered_customer_date',
    'order_estimated_delivery_date'
)

In [None]:
#  All the timestamp cols are in the same format yyyy-MM-dd HH:mm:ss. Change the type from string to timestamp

timestamp_cols = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date"
]

for col_name in timestamp_cols:
    df_orders = df_orders.withColumn(
        col_name,
        to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
    )

# Since the HH:mm:ss of order_estimated_delivery_date are same, which is 00:00:00, I think it is better to change it into date type
df_orders = df_orders.withColumn(
    "order_estimated_delivery_date",
    to_date(col("order_estimated_delivery_date"), "yyyy-MM-dd")
)

df_orders.printSchema()

Anomaly in orders

1. order_status = "canceled" tapi delivery_time ≠ NULL (ada order_delivered_customer_date)
2. order_status = "delivered" tapi order_approved_at = NULL
3. order_status = "delivered" tapi order_delivered_carrier_date = NULL
4. order_status = "delivered" tapi order_delivered_customer_date = NULL
5. order_id count beda antara orders (99441) dan order_items (98666) -> Artinya ada order yang tidak punya item. Itu tidak valid secara logika.


In [None]:
orderid_orderitems = df_order_items.select("order_id").distinct()

df_orders_flagged = df_orders.alias("x")\
.join(
    orderid_orderitems.alias("y"),
    col("x.order_id") == col("y.order_id"),
    how="left") \
.withColumn(
    "flag_missing_items", when(col("y.order_id").isNull(), lit(True))) \
.drop(col("y.order_id"))

In [None]:
df_orders_flagged = df_orders_flagged \
    .withColumn("flag_delivered_customer_but_canceled", when((col("order_delivered_customer_date").isNotNull()) & (col("order_status") == "canceled"), lit(True))) \
    .withColumn("flag_missing_approvaldate_on_delivered", when((col("order_approved_at").isNull()) & (col("order_status") == "delivered"), lit(True))) \
    .withColumn("flag_missing_carrierdate_on_delivered", when((col("order_delivered_carrier_date").isNull()) & (col("order_status") == "delivered"), lit(True))) \
    .withColumn("flag_missing_delivcustdate_on_delivered", when((col("order_delivered_customer_date").isNull()) & (col("order_status") == "delivered"), lit(True)))

In [None]:
# orders_valid -> data yang tidak mengandung flag
# orders_anomalies → data yang mengandung setidaknya satu flag

df_orders_valid = df_orders_flagged.filter(
    (col("flag_delivered_customer_but_canceled").isNull()) &
    (col("flag_missing_approvaldate_on_delivered").isNull()) &
    (col("flag_missing_carrierdate_on_delivered").isNull()) &
    (col("flag_missing_delivcustdate_on_delivered").isNull()) &
    (col("flag_missing_items").isNull())
)

df_orders_anomalies = df_orders_flagged.subtract(df_orders_valid)

In [None]:
# df_orders_anomalies.select(
#     sum(col("flag_delivered_customer_but_canceled").cast("int")).alias("anomaly_1"),
#     sum(col("flag_missing_approvaldate_on_delivered").cast("int")).alias("anomaly_2"),
#     sum(col("flag_missing_carrierdate_on_delivered").cast("int")).alias("anomaly_3"),
#     sum(col("flag_missing_delivcustdate_on_delivered").cast("int")).alias("anomaly_4"),
#     sum(col("flag_missing_items").cast("int")).alias("anomaly_5")
# ).show()

In [None]:
df_orders_valid = df_orders_valid.select("order_id", "customer_id", "order_status", "order_purchase_timestamp", "order_approved_at", "order_delivered_carrier_date", "order_delivered_customer_date", "order_estimated_delivery_date")

In [None]:
missing_values(df_orders_valid, "Orders Valid")

In [None]:
df_orders_valid.printSchema()

In [None]:
df_orders_valid = df_orders_valid.fillna({'order_delivered_carrier_date':'9999-12-31 23:59:59'})
df_orders_valid = df_orders_valid.fillna({'order_delivered_customer_date':'9999-12-31 23:59:59'})

In [None]:
missing_values(df_orders_valid, "Orders Valid")

In [None]:
df_orders_valid = df_orders_valid.withColumn('delivery_time',datediff(col('order_delivered_customer_date'),col('order_purchase_timestamp'))).orderBy(desc('delivery_time'))

In [None]:
quantiles = df_orders_valid.filter("order_delivered_customer_date != '9999-12-31 23:59:59'").approxQuantile('delivery_time',[0.01,0.99],0.0)
low_cutoff, high_cutoff = quantiles[0], quantiles[1]

In [None]:
low_cutoff, high_cutoff

In [None]:
df_orders_valid.filter("order_delivered_customer_date != '9999-12-31 23:59:59'").agg(min("delivery_time"), max("delivery_time"), mean("delivery_time")).show()

In [None]:
df_orders_valid = df_orders_valid.filter((col('delivery_time') >=low_cutoff) & (col('delivery_time') <=high_cutoff))

In [None]:
df_orders_valid.agg(countDistinct("order_id"), count("*")).show()

In [None]:
# write cleaned table to parquet file
df_orders_valid.write.mode("overwrite").parquet(f"{path_head}cleaned/orders/")

# write anomalies table to parquet file
df_orders_anomalies.write.mode("overwrite").parquet(f"{path_head}anomalies/anomaly_orders/")

# write full table with flag
df_orders_flagged.write.mode("overwrite").parquet(f"{path_head}flagged/anomaly_orders_full/")

### Order Items

In [6]:
df_order_items = df_order_items.withColumn("shipping_limit_date", to_timestamp(col("shipping_limit_date"), "yyyy-MM-dd HH:mm:ss"))

In [7]:
missing_values(df_order_items, "Order Items")

Missing values in Order Items:




+--------+-------------+----------+---------+-------------------+-----+-------------+----------------------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|shipping_limit_date_is_valid|
+--------+-------------+----------+---------+-------------------+-----+-------------+----------------------------+
|       0|            0|         0|        0|                  0|    0|            0|                           0|
+--------+-------------+----------+---------+-------------------+-----+-------------+----------------------------+



                                                                                

In [8]:
df_order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+----------------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|shipping_limit_date_is_valid|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+----------------------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|                        true|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|                        true|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|                        true|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        

                                                                                

In [9]:
df_order_items.filter("order_id = '9bad44921c98e1f220a667504bcffb9e'").select("order_id", "product_id", "price").show(truncate=False)

                                                                                

+--------------------------------+--------------------------------+-----+
|order_id                        |product_id                      |price|
+--------------------------------+--------------------------------+-----+
|9bad44921c98e1f220a667504bcffb9e|3dacb3ae011b40803a508b23392e15a0|399.0|
+--------------------------------+--------------------------------+-----+



In [11]:
df_order_items_final = df_order_items.select("order_id", "order_item_id", "product_id", "seller_id", "shipping_limit_date", "price", "freight_value").groupBy("order_id", "product_id").agg(
    first("seller_id").alias("seller_id"),
    first("shipping_limit_date").alias("shipping_limit_date"),
    round(sum("price"), 2).alias("total_price"),
    round(sum("freight_value"), 2).alias("total_freight"),
    count("order_item_id").alias("qty")) \
    .withColumn("total_order_value", round(col("total_price") + col("total_freight"), 2))

In [14]:
df_order_items_final.filter("order_id = '9bad44921c98e1f220a667504bcffb9e'").select("order_id", "product_id", "qty", "total_price").show(truncate=False)



+--------------------------------+--------------------------------+---+-----------+
|order_id                        |product_id                      |qty|total_price|
+--------------------------------+--------------------------------+---+-----------+
|9bad44921c98e1f220a667504bcffb9e|3dacb3ae011b40803a508b23392e15a0|1  |399.0      |
+--------------------------------+--------------------------------+---+-----------+



                                                                                

In [34]:
df_order_items_final.agg(countDistinct("order_id"), count("*")).show()



+------------------------+--------+
|count(DISTINCT order_id)|count(1)|
+------------------------+--------+
|                   98666|  102425|
+------------------------+--------+



                                                                                

In [35]:
df_order_items_final.agg(countDistinct("product_id"), count("*")).show()

[Stage 86:>                                                         (0 + 2) / 2]

+--------------------------+--------+
|count(DISTINCT product_id)|count(1)|
+--------------------------+--------+
|                     32951|  102425|
+--------------------------+--------+



                                                                                

In [36]:
# write cleaned table to parquet file
df_order_items_final.write.mode("overwrite").parquet(f"{path_head}cleaned/order_items/")

                                                                                

### Order Payments

In [37]:
missing_values(df_order_payments, 'Payments')

Missing values in Payments:




+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



                                                                                

In [38]:
df_order_payments.filter("order_id = '1d9a9731b9c10fc9cba74e6f74782e8b'").orderBy("payment_sequential").groupBy("order_id").agg(
    round(sum("payment_value"), 2).alias("total_payment"),
    collect_list("payment_type").alias("payment_type"),
    collect_list("payment_installments").alias("payment_installments")
).show()

[Stage 108:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+
|            order_id|total_payment|        payment_type|payment_installments|
+--------------------+-------------+--------------------+--------------------+
|1d9a9731b9c10fc9c...|        63.58|[credit_card, vou...|[1, 1, 1, 1, 1, 1...|
+--------------------+-------------+--------------------+--------------------+



                                                                                

In [None]:
df_payments_summary = df_order_payments.orderBy("payment_sequential").groupBy("order_id").agg(
    round(sum("payment_value"), 2).alias("total_payment"),
    collect_list("payment_type").alias("payment_type"),
    collect_list("payment_installments").alias("payment_installments")
)

In [None]:
df_payments_summary.show(5, truncate=False)

In [None]:
df_payments_summary.agg(countDistinct("order_id"), count("*")).show()

In [None]:
# write cleaned table to parquet file
df_payments_summary.write.mode("overwrite").parquet(f"{path_head}cleaned/order_payments/")

### Order Reviews

In [None]:
df_order_reviews = df_order_reviews.withColumn("review_answer_timestamp", to_timestamp(col("review_answer_timestamp"), "yyyy-MM-dd HH:mm:ss"))

In [None]:
df_order_reviews = df_order_reviews.withColumn(
    "review_creation_date",
    to_date(to_timestamp(col("review_creation_date"), "yyyy-MM-dd HH:mm:ss"))
)

In [None]:
df_order_reviews.printSchema()

Anomaly in order_reviews

1. review_id or order_id or review_score is NULL
2. review_score = 0 (tidak masuk akal)
3. order_id yang formatnya tidak valid (bukan 32 karakter heksadesimal)
4. duplikat > 2 masuk ke nomor 3
   duplikat == 2 -> kemungkinan update reviews

In [None]:
df_order_reviews_flagged = df_order_reviews \
    .withColumn("flag_missing_reviewid_orderid_score", when((col("review_id").isNull()) | col("order_id").isNull() | col("review_score").isNull(), lit(True))) \
    .withColumn("flag_review_score_0", when((col("review_score") == 0), lit(True))) \
    .withColumn("flag_orderid_not_valid", when((~col("order_id").rlike("^[a-f0-9]{32}$")), lit(True)))

In [None]:
window_orderid = Window.partitionBy("order_id")
df_order_reviews_flagged = df_order_reviews_flagged.withColumn("order_id_count", count("order_id").over(window_orderid)) \
                                                   .withColumn("flag_duplicate", when((col("order_id_count") > 1), lit(True)))

In [None]:
# Hapus NULL pada kolom penting
df_order_reviews_clean = df_order_reviews_flagged.filter(
    col("review_id").isNotNull() |
    col("order_id").isNotNull() |
    col("review_score").isNotNull()
)

In [None]:
# Hapus order_id yang formatnya tidak valid (bukan 32 karakter heksadesimal)
df_order_reviews_clean = df_order_reviews_clean.filter(col("order_id").rlike("^[a-f0-9]{32}$"))

In [None]:
# Hapus review_score yang bernilai 0 (tidak masuk akal)
df_order_reviews_clean = df_order_reviews_clean.filter(col("review_score") > 0)

In [None]:
# drop duplicate di mana yang terbaru bertahan
window = Window.partitionBy("order_id").orderBy(col("review_creation_date").desc())

df_order_reviews_clean = df_order_reviews_clean.withColumn("rn", row_number().over(window)) \
                                   .filter(col("rn") == 1) \
                                   .drop("rn")

In [None]:
df_order_reviews_clean = df_order_reviews_clean.drop("flag_missing_reviewid_orderid_score", "flag_review_score_0", "flag_orderid_not_valid", "order_id_count", "flag_duplicate", "review_creation_date_is_valid", "review_answer_timestamp_is_valid")

In [None]:
df_order_reviews_clean.groupBy("order_id").count().filter('count > 1').show()

In [None]:
missing_values(df_order_reviews_clean, "Reviews Clean")

In [None]:
df_order_reviews_clean = df_order_reviews_clean.fillna({
    'review_comment_title' : '-',
    'review_comment_message' : '-',
    'review_creation_date' : '9999-12-31',
    'review_answer_timestamp' : '9999-12-31 23:59:59',
})

In [None]:
missing_values(df_order_reviews_clean, "Reviews Clean")

In [None]:
df_order_reviews_clean.agg(countDistinct("order_id"), count("*")).show()

In [None]:
df_order_reviews_anomalies = df_order_reviews_flagged.filter(
    (col("flag_missing_reviewid_orderid_score") == True) |
    (col("flag_review_score_0") == True) |
    (col("flag_orderid_not_valid") == True) |
    (col("flag_duplicate") == True)
)

df_order_reviews_anomalies.select(
    sum(col("flag_missing_reviewid_orderid_score").cast("int")).alias("anomaly_1"),
    sum(col("flag_review_score_0").cast("int")).alias("anomaly_2"),
    sum(col("flag_orderid_not_valid").cast("int")).alias("anomaly_3"),
    sum(col("flag_duplicate").cast("int")).alias("anomaly_4"),
).show()

In [None]:
# write cleaned table to parquet file
df_order_reviews_clean.write.mode("overwrite").parquet(f"{path_head}cleaned/order_reviews/")

# write anomalies table to parquet file
df_order_reviews_anomalies.write.mode("overwrite").parquet(f"{path_head}anomalies/anomaly_order_reviews/")

# write full table with flag
df_order_reviews_flagged.write.mode("overwrite").parquet(f"{path_head}flagged/anomaly_orderreviews_full/")


In [4]:
spark.stop()