jupyter:
  jupytext:
    formats: ipynb,py:light
    text_representation:
      extension: .py
      format_name: light
      format_version: '1.5'
      jupytext_version: 1.18.1
  kernelspec:
    display_name: Python 3 (ipykernel)
    language: python
    name: python3

In [39]:
import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), "..")))

In [40]:
if "__file__" in globals():
    # Running from a script (e.g., src/01_bronze_ingestion.py)
    project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
else:
    # Running from a notebook inside /notebooks
    project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))

In [41]:
from config.spark_config import create_spark
from pyspark.sql import functions as F, DataFrame

In [42]:
bronze_path = os.path.join(project_root, "delta", "bronze") + "/"
silver_path = os.path.join(project_root, "delta", "silver") + "/"

In [43]:
def read_bronze_tables(spark):
    """Load all Bronze Delta tables into a dictionary."""
    return {
        "orders": spark.read.format("delta").load(f"{bronze_path}olist_orders_dataset"),
        "order_items": spark.read.format("delta").load(f"{bronze_path}olist_order_items_dataset"),
        "payments": spark.read.format("delta").load(f"{bronze_path}olist_order_payments_dataset"),
        "customers": spark.read.format("delta").load(f"{bronze_path}olist_customers_dataset"),
        "products": spark.read.format("delta").load(f"{bronze_path}olist_products_dataset"),
        "sellers": spark.read.format("delta").load(f"{bronze_path}olist_sellers_dataset"),
        "reviews": spark.read.format("delta").load(f"{bronze_path}olist_order_reviews_dataset"),
        "translations": spark.read.format("delta").load(f"{bronze_path}product_category_name_translation")
    }

In [44]:
def clean_dataframe(df: DataFrame, subset_cols: list[str]) -> DataFrame:
    """Remove duplicates and null values based on subset columns."""
    return df.dropDuplicates().dropna(subset=subset_cols)

In [45]:
def add_derived_columns(orders: DataFrame, order_items: DataFrame, payments: DataFrame):
    """Add calculated columns for total price, profit margin, delivery time, and payment count."""
    order_items = (
        order_items
        .withColumn("total_price", F.col("price") + F.col("freight_value"))
        .withColumn("profit_margin", F.col("price") - F.col("freight_value"))
    )

    orders = orders.withColumn(
        "delivery_time_days",
        F.datediff(F.col("order_delivered_customer_date"), F.col("order_purchase_timestamp"))
    )

    payments_agg = (
        payments
        .groupBy("order_id")
        .agg(F.sum("payment_installments").alias("payment_count"))
    )

    return orders, order_items, payments_agg

In [46]:
def create_silver_tables(orders, customers, order_items, payments_agg, reviews, products, sellers, translations):
    """Join datasets to create enriched Silver tables."""
    silver_orders = (
        orders
        .join(customers, "customer_id", "left")
        .join(payments_agg, "order_id", "left")
        .join(reviews.select("order_id", "review_score"), "order_id", "left")
    )

    silver_order_items = (
        order_items
        .join(products, "product_id", "left")
        .join(sellers, "seller_id", "left")
        .join(translations, "product_category_name", "left")
    )

    return silver_orders, silver_order_items

In [47]:
def write_delta(df: DataFrame, name: str):
    """Write DataFrame as a Delta table to the Silver path."""
    path = f"{silver_path}{name}"
    df.write.format("delta").mode("overwrite").save(path)
    print(f" Written {name} to {path}")

In [48]:
def main():
    """Pipeline entrypoint for Silver transformation."""
    spark = create_spark("Silver transformations")
    bronze = read_bronze_tables(spark)

    orders = clean_dataframe(bronze["orders"], ["order_id"])
    order_items = clean_dataframe(bronze["order_items"], ["order_id", "price"])
    payments = clean_dataframe(bronze["payments"], ["order_id"])
    customers = clean_dataframe(bronze["customers"], ["customer_id"])
    reviews = clean_dataframe(bronze["reviews"], ["order_id"])
    products = clean_dataframe(bronze["products"], ["product_id"])
    sellers = clean_dataframe(bronze["sellers"], ["seller_id"])
    translations = clean_dataframe(bronze["translations"], ["product_category_name"])

    orders, order_items, payments_agg = add_derived_columns(orders, order_items, payments)

    silver_orders, silver_order_items = create_silver_tables(
        orders, customers, order_items, payments_agg, reviews, products, sellers, translations
    )

    write_delta(silver_orders, "orders_enriched")
    write_delta(silver_order_items, "order_items_enriched")
    orders = spark.read.format("delta").load("../delta/silver/orders_enriched")
    order_items = spark.read.format("delta").load("../delta/silver/order_items_enriched")

    common_orders = orders.join(order_items, "order_id", "inner")
    print("Matched orders count:", common_orders.count())

    orders_count = orders.count()
    items_count = order_items.count()
    print(f"Orders: {orders_count}, Items: {items_count}, Matched: {common_orders.count()}")


    spark.stop()
    print("Silver transformation completed successfully.")

In [49]:
if __name__ == "__main__":
    main()

                                                                                

 Written orders_enriched to /home/arber/lufthansa-data-project/delta/silver/orders_enriched


                                                                                

 Written order_items_enriched to /home/arber/lufthansa-data-project/delta/silver/order_items_enriched
Matched orders count: 113314
Orders: 99992, Items: 112650, Matched: 113314
Silver transformation completed successfully.
