In [1]:
import os
os.environ["PYSPARK_PYTHON"] = r"C:\Users\ranjan\Desktop\spark-olist-pipeline\venv\Scripts\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\ranjan\Desktop\spark-olist-pipeline\venv\Scripts\python.exe"
os.environ["spark.python.worker.reuse"] = "true"

In [2]:
import sys
from pyspark.sql.functions import broadcast
sys.path.append(os.path.abspath("../src")) 


In [3]:
from bronze import create_spark_session, ingest_csv
from silver import curate_sales
from gold import create_daily_sales_state
spark = create_spark_session("OlistPipeline")

In [4]:
# Check AQE
print("Adaptive Query Execution Enabled:", spark.conf.get("spark.sql.adaptive.enabled"))

Adaptive Query Execution Enabled: true


In [5]:
# Define paths
base_input = "../data/"
bronze_output = "../delta/bronze/"

In [6]:

# Bronze ingestion for all Olist tables
datasets = {
    "customers": "olist_customers_dataset.csv",
    "orders": "olist_orders_dataset.csv",
    "order_items": "olist_order_items_dataset.csv",
    "order_payments": "olist_order_payments_dataset.csv",
    "order_reviews": "olist_order_reviews_dataset.csv",
    "products": "olist_products_dataset.csv",
    "sellers": "olist_sellers_dataset.csv",
    "geolocation": "olist_geolocation_dataset.csv",
    "category_translation": "product_category_name_translation.csv"
}


In [7]:
for name, filename in datasets.items():
    input_path = f"{base_input}{filename}"
    output_path = f"{bronze_output}{name}"

    print(f"\n[INFO] Ingesting {name} → {output_path}")
    
    if name == "orders":  # partition only orders
        ingest_csv(spark, input_path, output_path, partition_col="order_purchase_month", target_file_rows=50000)
    else:
        ingest_csv(spark, input_path, output_path, target_file_rows=50000)

    # Read back from Delta & show
    df = spark.read.format("delta").load(output_path)
    print(f"[INFO] Showing {name} table (first 5 rows):")
    df.show(5, truncate=False)



[INFO] Ingesting customers → ../delta/bronze/customers
[INFO] Ingested ../data/olist_customers_dataset.csv → ../delta/bronze/customers
[INFO] Files written under ../delta/bronze/customers:
   part-00000-7cf98781-02c5-41c0-a7a1-00eeec53f1d0-c000.snappy.parquet → 3.36 MB
   part-00001-05470c5b-349c-48ff-a110-78355bde9509-c000.snappy.parquet → 3.35 MB
[INFO] Showing customers table (first 5 rows):
+--------------------------------+--------------------------------+------------------------+-------------+--------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city|customer_state|
+--------------------------------+--------------------------------+------------------------+-------------+--------------+
|15357da0aa538a1e5efabe63f0d71095|57f66dc16e5edbe9417e1e21c783b5a9|13010                   |campinas     |SP            |
|47aa66905ec3df607310af4988d34bca|58aaf012cc88d1de4b976f45e059ddf5|4844                    |sao paulo    |SP     

In [8]:
print("Bronze layer  ingestion completed successfully!")

Bronze layer  ingestion completed successfully!


In [9]:
silver_output = "../delta/silver/"


In [10]:
# Curate Silver Layer
curate_sales(spark, bronze_base=bronze_output, silver_base=silver_output)

 Silver layer sales curated and saved at ../delta/silver//sales


In [11]:
# Verify Silver
print("\n[INFO] Silver Layer Sales Table:")
sales_df = spark.read.format("delta").load(f"{silver_output}/sales")
sales_df.show(5, truncate=False)


[INFO] Silver Layer Sales Table:
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+-------------+-------------------+-----+-------------+-----------+------------------+------------+--------------------+-------------+--------------------------------+------------------------+--------------+--------------+----------------------+-----------+------------+---------------------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-------------------------------+
|product_id                      |seller_id                       |customer_id                     |order_id                        |order_status|order_purchase_timestamp|order_approved_at  |or

In [12]:
sales_df.select("seller_id", "seller_salt").distinct().show(10)

+--------------------+-----------+
|           seller_id|seller_salt|
+--------------------+-----------+
|52a52b9f656520e99...|          1|
|b0b346d3a89f5eb4c...|          0|
|41b39e28db005d973...|          1|
|0b90b6df587eb8360...|          4|
|0ea22c1cfbdc755f8...|          2|
|7142540dd4c91e223...|          1|
|b335c59ab742f751a...|          3|
|1430239a858e7682b...|          2|
|626ab1cd96932367f...|          1|
|a5ff20ff766e7f50b...|          0|
+--------------------+-----------+
only showing top 10 rows



In [13]:
# Unique city/state samples for customers & sellers
silver_path = "../delta/silver/"
customers_df = spark.read.format("delta").load(silver_path + "customers")
sellers_df = spark.read.format("delta").load(silver_path + "sellers")

print("Customer city samples:", [row.customer_city for row in customers_df.select("customer_city").distinct().limit(5).collect()])
print("Customer state samples:", [row.customer_state for row in customers_df.select("customer_state").distinct().limit(5).collect()])

print("Seller city samples:", [row.seller_city for row in sellers_df.select("seller_city").distinct().limit(5).collect()])
print("Seller state samples:", [row.seller_state for row in sellers_df.select("seller_state").distinct().limit(5).collect()])

Customer city samples: ['igrejinha', 'aguas de sao pedro', 'camacari', 'arapiraca', 'pote']
Customer state samples: ['pi', 'pr', 'rj', 'pb', 'ro']
Seller city samples: ['igrejinha', 'brusque', 'buritama', 'sao joao de meriti', 'garca']
Seller state samples: ['pi', 'pr', 'rj', 'pb', 'ro']


In [14]:
print("Duplicate customers:", customers_df.count() - customers_df.dropDuplicates(["customer_id"]).count())
print("Duplicate sellers:", sellers_df.count() - sellers_df.dropDuplicates(["seller_id"]).count())

Duplicate customers: 0
Duplicate sellers: 0


In [15]:
sales_df = spark.read.format("delta").load(silver_path + "sales")

print("Sample seller_id_salted:")
sales_df.select("seller_salt").distinct().show(10, truncate=False)

Sample seller_id_salted:
+-----------+
|seller_salt|
+-----------+
|1          |
|3          |
|4          |
|2          |
|0          |
+-----------+



In [16]:
gold_base = "../delta/gold/"

In [17]:
# Run Gold marts
create_daily_sales_state(spark, silver_path, gold_base)

Gold table created at ../delta/gold//daily_sales_state (customer + seller state revenue)


In [18]:
# Verify Gold table
df_gold = spark.read.format("delta").load(f"{gold_base}/daily_sales_state")
print("Sample Gold table:")
df_gold.show(5, truncate=False)


Sample Gold table:
+----------+--------------+------------+------------------+
|order_date|customer_state|seller_state|daily_revenue     |
+----------+--------------+------------+------------------+
|2017-02-27|al            |sp          |58.39             |
|2017-02-27|ba            |sp          |361.98            |
|2017-02-27|mg            |mg          |72.5              |
|2017-02-27|mg            |sp          |437.03000000000003|
|2017-02-27|mt            |es          |242.10000000000002|
+----------+--------------+------------+------------------+
only showing top 5 rows



In [20]:
# Load sales table
sales = spark.read.format("delta").load(f"{silver_path}/sales")

In [22]:
import time
# AQE OFF
spark.conf.set("spark.sql.adaptive.enabled", "false")
start = time.time()
sales.groupBy("customer_state").sum("price").show()
print("AQE OFF runtime:", time.time() - start)

# AQE ON
spark.conf.set("spark.sql.adaptive.enabled", "true")
start = time.time()
sales.groupBy("customer_state").sum("price").show()
print("AQE ON runtime:", time.time() - start)

+--------------+------------------+
|customer_state|        sum(price)|
+--------------+------------------+
|            pi| 90156.87999999996|
|            pr| 685703.6699999984|
|            rj|1849589.9499999874|
|            pb|120634.06000000007|
|            ro| 45176.64999999999|
|            ba| 526261.0800000001|
|          NULL|439617.68000000075|
|            ms|116679.15999999999|
|            mg|1592170.1999999825|
|            go| 301968.0600000005|
|            sc| 516302.6900000002|
|            es|273704.54000000056|
|            rs| 759279.2699999973|
|            rn| 92823.27000000002|
|            pe| 263488.0700000005|
|            ce|235778.63000000032|
|            ap|           13454.3|
|            sp|  5270066.25000016|
|            al| 80083.78999999996|
|            ac|          16146.34|
+--------------+------------------+
only showing top 20 rows

AQE OFF runtime: 4.383326768875122
+--------------+------------------+
|customer_state|        sum(price)|
+--

In [24]:
customers = spark.read.format("delta").load(f"{silver_path}/customers")

# Shuffle Join (default)
df1 = sales.join(customers, "customer_id")
df1.explain(True)



== Parsed Logical Plan ==
'Join UsingJoin(Inner, [customer_id])
:- Relation [product_id#40264,seller_id#40265,customer_id#40266,order_id#40267,order_status#40268,order_purchase_timestamp#40269,order_approved_at#40270,order_delivered_carrier_date#40271,order_delivered_customer_date#40272,order_estimated_delivery_date#40273,order_purchase_month#40274,order_item_id#40275,shipping_limit_date#40276,price#40277,freight_value#40278,seller_salt#40279,payment_sequential#40280,payment_type#40281,payment_installments#40282,payment_value#40283,customer_unique_id#40284,customer_zip_code_prefix#40285,customer_city#40286,customer_state#40287,... 12 more fields] parquet
+- Relation [customer_id#40654,customer_unique_id#40655,customer_zip_code_prefix#40656,customer_city#40657,customer_state#40658] parquet

== Analyzed Logical Plan ==
customer_id: string, product_id: string, seller_id: string, order_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, orde

In [25]:
# Broadcast Join
df2 = sales.join(broadcast(customers), "customer_id")
df2.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [customer_id])
:- Relation [product_id#40264,seller_id#40265,customer_id#40266,order_id#40267,order_status#40268,order_purchase_timestamp#40269,order_approved_at#40270,order_delivered_carrier_date#40271,order_delivered_customer_date#40272,order_estimated_delivery_date#40273,order_purchase_month#40274,order_item_id#40275,shipping_limit_date#40276,price#40277,freight_value#40278,seller_salt#40279,payment_sequential#40280,payment_type#40281,payment_installments#40282,payment_value#40283,customer_unique_id#40284,customer_zip_code_prefix#40285,customer_city#40286,customer_state#40287,... 12 more fields] parquet
+- ResolvedHint (strategy=broadcast)
   +- Relation [customer_id#40654,customer_unique_id#40655,customer_zip_code_prefix#40656,customer_city#40657,customer_state#40658] parquet

== Analyzed Logical Plan ==
customer_id: string, product_id: string, seller_id: string, order_id: string, order_status: string, order_purchase_timestamp: times

In [27]:
from pyspark.sql.functions import col, floor, rand

num_salts = 10
# Add salt
sales_salted = sales.withColumn("salt", floor(rand()*num_salts))
customers_salted = customers.withColumn("salt", floor(rand()*num_salts))

# Rename customer_state from customers to avoid ambiguity
customers_salted = customers_salted.withColumnRenamed("customer_state", "cust_state")

# Join on customer_id + salt
df_salted = sales_salted.join(customers_salted, ["customer_id", "salt"])

# Group by renamed column
df_salted.groupBy("cust_state").sum("price").show()

+----------+------------------+
|cust_state|        sum(price)|
+----------+------------------+
|        pi|           6172.67|
|        pr| 74134.08999999997|
|        rj|193331.46000000046|
|        pb|          10218.29|
|        ro|           3191.06|
|        ba| 53124.19999999996|
|        ms|7291.6500000000015|
|        mg|144852.08000000045|
|        go|28965.059999999987|
|        sc| 48323.62999999996|
|        es|36546.059999999976|
|        rs|          74071.88|
|        rn|           7571.64|
|        pe|26286.319999999992|
|        ce|24599.549999999992|
|        ap|            903.31|
|        sp| 540125.6899999973|
|        al|10693.050000000001|
|        ac|           2071.07|
|        mt|          12384.07|
+----------+------------------+
only showing top 20 rows



In [28]:
df = sales.join(customers, "customer_id")
df.explain("formatted")  # Detailed plan


== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
   +- BroadcastHashJoin Inner BuildRight (6)
      :- Filter (2)
      :  +- Scan parquet  (1)
      +- BroadcastExchange (5)
         +- Filter (4)
            +- Scan parquet  (3)


(1) Scan parquet 
Output [36]: [product_id#40264, seller_id#40265, customer_id#40266, order_id#40267, order_status#40268, order_purchase_timestamp#40269, order_approved_at#40270, order_delivered_carrier_date#40271, order_delivered_customer_date#40272, order_estimated_delivery_date#40273, order_purchase_month#40274, order_item_id#40275, shipping_limit_date#40276, price#40277, freight_value#40278, seller_salt#40279, payment_sequential#40280, payment_type#40281, payment_installments#40282, payment_value#40283, customer_unique_id#40284, customer_zip_code_prefix#40285, customer_city#40286, customer_state#40287, seller_zip_code_prefix#40288, seller_city#40289, seller_state#40290, product_category_name#40291, product_name_lenght#40292, product_description_

In [None]:
spark.stop()