In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("Olist Silver Layer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/home/saraballkoci/miniconda3/envs/data_pipeline/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/saraballkoci/.ivy2.5.2/cache
The jars for the packages stored in: /home/saraballkoci/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b9b0d512-2177-4960-adde-a9775d6e1e76;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 419ms :: artifacts dl 22ms
	:: modules in use:
	io.delta#delta-spark_2.13;4.0.0 from central in [default]
	io.delta#delta-storage;4.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.13.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            mo

In [3]:
orders = spark.read.format("delta").load("../delta/bronze/orders")
order_items = spark.read.format("delta").load("../delta/bronze/order_items")
customers = spark.read.format("delta").load("../delta/bronze/customers")
products = spark.read.format("delta").load("../delta/bronze/products")
sellers = spark.read.format("delta").load("../delta/bronze/sellers")
payments = spark.read.format("delta").load("../delta/bronze/payments")
reviews = spark.read.format("delta").load("../delta/bronze/reviews")
geolocations=spark.read.format("delta").load("../delta/bronze/geolocations")
product_translation=spark.read.format("delta").load("../delta/bronze/product_translations")

In [4]:
orders.show(5)
customers.show(5)

25/06/26 19:57:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+----+-----+---+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|year|month|day|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+----+-----+---+
|0efd0bc268d34da3f...|95446917717bb58d5...|     shipped|     2016-10-07 15:53:31|2016-10-07 16:12:22|         2016-10-11 16:12:23|                         NULL|          2016-12-13 00:00:00|2016|   10|  7|
|5c973d2b4652e1dec...|8a863458d761a9b40...|   delivered|     2016-10-07 10:45:48|2016-10-07 12:13:51|         2016-10-11 12:13:52|          2016-10-14 12:13:52|          2016-1

                                                                                

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows


In [7]:
orders = orders.dropna().dropDuplicates()
customers = customers.dropna().dropDuplicates()
order_items = order_items.dropna().dropDuplicates()
products = products.dropna().dropDuplicates()
sellers = sellers.dropna().dropDuplicates()
payments = payments.dropna().dropDuplicates()
reviews = reviews.dropna().dropDuplicates()
geolocations = geolocations.dropna().dropDuplicates()
product_translation = product_translation.dropna().dropDuplicates()

In [12]:
from pyspark.sql.functions import to_timestamp, year, month, dayofmonth
from pyspark.sql.functions import sum, col, datediff



# Ensure timestamp is properly cast
orders = orders.withColumn("order_purchase_timestamp", to_timestamp("order_purchase_timestamp"))

# Step 1: Join orders with order_items
silver_df = orders.join(order_items, on="order_id", how="inner")

# Step 2: Join with products
silver_df = silver_df.join(products, on="product_id", how="left")

# Step 3: Join with sellers
silver_df = silver_df.join(sellers, on="seller_id", how="left")

# Step 4: Join with customers
silver_df = silver_df.join(customers, on="customer_id", how="left")

# Step 5: Aggregate and join payments
payments_agg = payments.groupBy("order_id").agg(sum("payment_installments").alias("payment_count")
)
silver_df = silver_df.join(payments_agg, on="order_id", how="left")

# Step 6: Join reviews (optional)
silver_df = silver_df.join(reviews.select("order_id", "review_score"), on="order_id", how="left")

# Step 7: Add calculated columns
silver_df = silver_df \
    .withColumn("total_price", col("price") + col("freight_value")) \
    .withColumn("profit_margin", col("price") - col("freight_value")) \
    .withColumn("delivery_time", datediff("order_delivered_customer_date", "order_purchase_timestamp")) \
    .withColumn("year", year("order_purchase_timestamp")) \
    .withColumn("month", month("order_purchase_timestamp")) \
    .withColumn("day", dayofmonth("order_purchase_timestamp"))

In [13]:
# Step 8: Clean data
silver_df = silver_df.dropna().dropDuplicates()

# Step 9: Write to Delta (Silver Layer)
silver_df.write.format("delta").mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .save("../delta/silver/orders_enriched")


                                                                                

In [14]:
silver_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: i

In [18]:
silver_df.select("*").show(10, truncate=False)




+--------------------------------+--------------------------------+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+----+-----+---+-------------+-------------------+-----+-------------+-----------------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+----------------+------------+--------------------------------+------------------------+-----------------------+--------------+-------------+------------+-----------+-------------------+-------------+
|order_id                        |customer_id                     |seller_id                       |product_id                      |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estim

                                                                                