In [0]:
spark

In [0]:
base = "dbfs:/FileStore/tables/"

# Loading Datasets from Databricks File Storage

In [0]:
customers = spark.read.option("header",True).csv(base + "olist_customers_dataset.csv")
geoloc = spark.read.option("header",True).csv(base + "olist_geolocation_dataset.csv")
items = spark.read.option("header",True).csv(base + "olist_order_items_dataset.csv")

payments = spark.read.option("header",True).csv(base + "olist_order_payments_dataset.csv")
reviews = spark.read.option("header",True).csv(base + "olist_order_reviews_dataset.csv")
orders = spark.read.option("header",True).csv(base + "olist_orders_dataset.csv")

products = spark.read.option("header",True).csv(base + "olist_products_dataset.csv")
sellers = spark.read.option("header",True).csv(base + "olist_sellers_dataset.csv")
cat_trans = spark.read.option("header",True).csv(base + "product_category_name_translation.csv")

# Clean & Prepare Data

In [0]:
from pyspark.sql.functions import to_timestamp

orders = orders.withColumn("order_purchase_timestamp", to_timestamp("order_purchase_timestamp"))


In [0]:
orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

# Converting Data Types

In [0]:
from pyspark.sql.functions import col, sum, count, avg, row_number,  when, datediff, max, min, lit

items = items.withColumn("price", col("price").cast("double")) \
             .withColumn("freight_value", col("freight_value").cast("double")) \
             .withColumn("order_item_id", col("order_item_id").cast("int"))

In [0]:

payments = payments.withColumn("payment_value", col("payment_value").cast("double"))

#Performing Joins 

In [0]:
products = products.join(cat_trans, "product_category_name", "left")

In [0]:
df = items.join(orders, "order_id", "inner") \
          .join(customers, "customer_id", "inner") \
          .join(products, "product_id", "left") \
          .join(sellers, "seller_id", "inner")


In [0]:
# df.show(5)
column_names = df.columns 
print(column_names)

['seller_id', 'product_id', 'customer_id', 'order_id', 'order_item_id', 'shipping_limit_date', 'price', 'freight_value', 'order_status', 'order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state', 'product_category_name', 'product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm', 'product_category_name_english', 'seller_zip_code_prefix', 'seller_city', 'seller_state']


#  Checking  for Duplicates

In [0]:
total_rows = df.count()
distinct_rows = df.dropDuplicates().count()
print(f"Total rows: {total_rows}, Distinct rows: {distinct_rows}")


Total rows: 112650, Distinct rows: 112650


In [0]:
from pyspark.sql.functions import col, count, isnan
 
numeric_cols = [field.name for field in df.schema.fields if str(field.dataType) in ['DoubleType', 'IntegerType']]
other_cols = [field.name for field in df.schema.fields if field.name not in numeric_cols]
 
numeric_nulls = df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in numeric_cols
])

# Count only nulls for string/timestamp columns
non_numeric_nulls = df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in other_cols
])
 
print("Null/NaN counts in numeric columns:")
numeric_nulls.show(truncate=False)

print("Null counts in non-numeric columns:")
non_numeric_nulls.show(truncate=False)


Null/NaN counts in numeric columns:
++
||
++
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
++
only showing top 20 rows

Null counts in non-numeric columns:
+---------+----------+-----------+--------+-------------+-------------------+-----+-------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+------------------+------------------------+-------------+--------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+----------------------+-----------+------------+
|seller_id|product_id|customer_id|order_id|order_item_id|shipping_limit_date|price|freight_value|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|customer_unique_id|customer_zip_code

In [0]:
df = df.na.fill({
    "product_photos_qty": 0,
    "product_name_lenght": 0,
    "product_description_lenght": 0
}) 


In [0]:
column_names = df.columns 
print(column_names)

['seller_id', 'product_id', 'customer_id', 'order_id', 'order_item_id', 'shipping_limit_date', 'price', 'freight_value', 'order_status', 'order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state', 'product_category_name', 'product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm', 'product_category_name_english', 'seller_zip_code_prefix', 'seller_city', 'seller_state']


# Top Selling Products per Region

In [0]:
from pyspark.sql.window import Window  # rank
from pyspark.sql.functions import col,desc

top_items = (df.withColumn("revenue", col("price") * col("order_item_id"))
              .groupBy("customer_state", "product_category_name_english")
              .agg(sum("order_item_id").alias("units_sold"), sum("revenue").alias("total_revenue"))
)

w = Window.partitionBy("customer_state").orderBy(desc("units_sold"))
top5 = top_items.withColumn("rank", row_number().over(w)).filter(col("rank") <= 5)

display(top5)


customer_state,product_category_name_english,units_sold,total_revenue,rank
AC,furniture_decor,28,2684.8400000000006,1
AC,computers_accessories,12,1350.18,2
AC,sports_leisure,9,1677.46,3
AC,health_beauty,8,1571.48,4
AC,bed_bath_table,6,767.5,5
AL,health_beauty,63,12780.259999999998,1
AL,computers_accessories,51,10035.58,2
AL,furniture_decor,49,4893.79,3
AL,watches_gifts,37,11922.27,4
AL,sports_leisure,34,3860.68,5


# Customer Lifetime Value (CLV)

In [0]:
from pyspark.sql.functions import countDistinct

user_stats = (df.groupBy("customer_unique_id")
                .agg(countDistinct("order_id").alias("num_orders"),
                     sum(col("price") * col("order_item_id")).alias("total_revenue"),
                     datediff(max("order_purchase_timestamp"), min("order_purchase_timestamp")).alias("days_active"))
                .filter(col("days_active") > 0)
                .withColumn("clv", (col("total_revenue")/col("num_orders")) * (col("num_orders")/col("days_active")) * 365)
)

display(user_stats.orderBy(desc("clv")).limit(10))


customer_unique_id,num_orders,total_revenue,days_active,clv
c8460e4251689ba205045f3ea17884a1,4,14280.0,1,5212200.0
58c1b085b54c03a1f1ab5f13d64c2b1c,2,2999.98,1,1094992.7
0f5ac8d5c31de21d2f25e24be15bbffb,2,8398.8,4,766390.4999999999
0341bbd5c969923a0f801b9e2d10a7b8,2,3656.88,2,667380.6
1f98d2384ff7a372e6a2d3bb75cbcd54,2,3510.0,2,640575.0
c7fb8ec1ea35af7e89f989b6e17e2bd8,2,1319.4,1,481581.00000000006
a40096fc0a3862e9e12bc55b5f8e6ab2,3,925.9,1,337953.5
2e7c511f54e9d94776c606007d672a22,2,799.6,1,291854.0
2516f43838c4892e066fec28143efa64,2,754.0,1,275210.0
4c4b5dc90eb4e33188153ca3e7999f92,2,1159.98,2,211696.35


# Purchase Funnel  

In [0]:
placed = orders.select("order_id").distinct().count()
paid = payments.select("order_id").distinct().count()
reviewed = reviews.select("order_id").distinct().count()

drop1 = (1 - paid / placed) * 100
drop2 = (1 - reviewed / paid) * 100

print(f"Placed Orders: {placed}")
print(f"Paid Orders: {paid} → Drop-off: {drop1:.2f}%")
print(f"Reviewed Orders: {reviewed} → Drop-off: {drop2:.2f}%")


Placed Orders: 99441
Paid Orders: 99440 → Drop-off: 0.00%
Reviewed Orders: 99743 → Drop-off: -0.30%


# Most Delayed Deliveries

In [0]:
df = df.withColumn("actual_delay", datediff(col("order_delivered_customer_date"), col("order_estimated_delivery_date")))
df.select("order_id", "customer_state", "actual_delay").filter(col("actual_delay") > 0).orderBy(col("actual_delay").desc()).show(5)


+--------------------+--------------+------------+
|            order_id|customer_state|actual_delay|
+--------------------+--------------+------------+
|1b3190b2dfa9d789e...|            RJ|         188|
|ca07593549f1816d2...|            ES|         181|
|47b40429ed8cce3ae...|            SP|         175|
|2fe324febf907e3ea...|            SP|         167|
|285ab9426d6982034...|            SE|         166|
+--------------------+--------------+------------+
only showing top 5 rows



# Revenue by Seller or Region

In [0]:
df.groupBy("seller_id").agg(sum("price").alias("total_seller_revenue")).orderBy(col("total_seller_revenue").desc()).show(10)
df.groupBy("customer_state").agg(sum("price").alias("state_revenue")).orderBy(col("state_revenue").desc()).show(10)


+--------------------+--------------------+
|           seller_id|total_seller_revenue|
+--------------------+--------------------+
|4869f7a5dfa277a7d...|   229472.6300000005|
|53243585a1d6dc264...|  222776.05000000002|
|4a3ca9315b744ce9f...|   200472.9200000013|
|fa1c13f2614d7b5c4...|   194042.0300000004|
|7c67e1448b00f6e96...|           187923.89|
|7e93a43ef30c4f03f...|  176431.86999999997|
|da8622b14eb17ae28...|   160236.5700000011|
|7a67c85e85bb2ce85...|  141745.53000000032|
|1025f0e2d44d7041d...|  138968.55000000022|
|955fee9216a65b617...|  135171.70000000024|
+--------------------+--------------------+
only showing top 10 rows

+--------------+------------------+
|customer_state|     state_revenue|
+--------------+------------------+
|            SP| 5202955.049999814|
|            RJ|1824092.6700000255|
|            MG|1585308.0300000103|
|            RS| 750304.0199999908|
|            PR| 683083.7599999931|
|            SC|520553.33999999723|
|            BA| 511349.9899999987

In [0]:
print(df.columns)

['seller_id', 'product_id', 'customer_id', 'order_id', 'order_item_id', 'shipping_limit_date', 'price', 'freight_value', 'order_status', 'order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state', 'product_category_name', 'product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm', 'product_category_name_english', 'seller_zip_code_prefix', 'seller_city', 'seller_state', 'actual_delay']


In [0]:
df.write.mode("overwrite").parquet("dbfs:/FileStore/cleaned_olist_data/")