Setup Spark & Delta Lake

In [None]:
from pathlib import Path
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, datediff
from pyspark.sql.functions import sum as _sum

builder = SparkSession.builder \
    .appName("Bronze Layer Ingestion") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Read data from bronze folder and save them as dataframes

In [None]:
bronze_path = Path("../delta/bronze")
silver_path = Path("../delta/silver")

customers = spark.read.format("delta").load(
    f"{bronze_path}/olist_customers_dataset")
orders = spark.read.format("delta").load(f"{bronze_path}/olist_orders_dataset")
order_items = spark.read.format("delta").load(
    f"{bronze_path}/olist_order_items_dataset")
payments = spark.read.format("delta").load(
    f"{bronze_path}/olist_order_payments_dataset")
reviews = spark.read.format("delta").load(
    f"{bronze_path}/olist_order_reviews_dataset")
products = spark.read.format("delta").load(
    f"{bronze_path}/olist_products_dataset")
sellers = spark.read.format("delta").load(
    f"{bronze_path}/olist_sellers_dataset")

Remove Duplicates & Handle Nulls

In [None]:
# This is a dimension table, so we retain only one row per customer_id
# and remove any rows with null values in any column.
customers_clean = customers.dropDuplicates(["customer_id"]).dropna()

# order_id is unique in this table. We deduplicate based on order_id
# Exclude delivery date columns when dropping rows with nulls, since order vary on status
nullable_cols = ["order_delivered_carrier_date",
                 "order_delivered_customer_date"]
non_nullable_cols = [col for col in orders.columns if col not in nullable_cols]

orders_clean = orders.dropDuplicates(
    ["order_id"]).dropna(subset=non_nullable_cols)

# This table represents order items. We remove any fully duplicated rows
# and drop those with null values in any column.
order_items_clean = order_items.dropDuplicates().dropna()

# This table contains one or more payment records per order, acting as a fact table.
# We deduplicate all identical rows and remove those with null values.
payments_clean = payments.dropDuplicates().dropna()

# The review_id column has duplicates where associated order_ids differ,
# but all other columns are identical.
# Since all the orders_id are valid, we retain all rows but deduplicate based on all columns.
cols_to_check = [col for col in reviews.columns if col not in [
    "review_comment_title", "review_comment_message"]]
reviews_clean = reviews.dropDuplicates().dropna(subset=cols_to_check)

# This is a dimension table, so we keep one row per product_id
# and remove any rows with null values in any column.
products_clean = products.dropDuplicates(["product_id"]).dropna()

# This is a dimension table, so we keep one row per seller_id
# and remove any rows with null values in any column.
sellers_clean = sellers.dropDuplicates(["seller_id"]).dropna()

Join necessary datasets to create enriched tables

In [None]:
# Get customers info and add them to order table
orders_with_customer_info = orders_clean.alias("a") \
    .join(
        customers_clean.select(
            "customer_id",
            "customer_zip_code_prefix",
            "customer_city",
            "customer_state"
        ).alias("b"),
        on=col("a.customer_id") == col("b.customer_id"),
        how="left"
).drop(col("b.customer_id"))

# Join order_items with products to get product attributes
order_items_enriched = order_items_clean.alias("oi") \
    .join(
        products_clean.select(
            "product_id",
            "product_category_name",
            "product_name_lenght",
            "product_description_lenght",
            "product_photos_qty",
            "product_weight_g",
            "product_length_cm",
            "product_height_cm",
            "product_width_cm"
        ).alias("p"),
        on=col("oi.product_id") == col("p.product_id"),
        how="left"
).drop(col("p.product_id"))

# Then join with sellers to get seller attributes
order_items_enriched = order_items_enriched.alias("oi") \
    .join(
        sellers_clean.select(
            "seller_id",
            "seller_zip_code_prefix",
            "seller_city",
            "seller_state"
        ).alias("s"),
        on=col("oi.seller_id") == col("s.seller_id"),
        how="left"
).drop(col("s.seller_id"))

Derive calculated columns

In [None]:
# Add two calculated columns in this df
order_items_enriched = order_items_enriched \
    .withColumn("total_price", col("price") + col("freight_value")) \
    .withColumn("profit_margin", col("price") - col("freight_value"))

# Calculate the delivery time in days while excluding null order_delivered_customer_date values,
orders_with_customer_info = orders_with_customer_info \
    .withColumn(
        "days",
        when(
            col("order_delivered_customer_date").isNotNull(),
            datediff(col("order_delivered_customer_date"),
                     col("order_purchase_timestamp"))
        )
    )

# Aggregate payment_installments from payments_clean
payments_agg = payments_clean.groupBy("order_id") \
    .agg(_sum("payment_installments").alias("payment_installments"))
# Join it with orders_with_customer_info to bring in payment_installments
orders_with_customer_info = orders_with_customer_info.alias("o") \
    .join(
        payments_agg.alias("p"),
        on=col("o.order_id") == col("p.order_id"),
        how="left"
).drop(col("p.order_id"))

Write each DataFrame as a Delta table

In [None]:
customers_clean.write.format("delta").mode(
    "overwrite").save(str(silver_path / "customers"))
orders_with_customer_info.write.format("delta").mode(
    "overwrite").save(str(silver_path / "orders"))
order_items_enriched.write.format("delta").mode(
    "overwrite").save(str(silver_path / "orders_items"))
payments_clean.write.format("delta").mode(
    "overwrite").save(str(silver_path / "payments"))
reviews_clean.write.format("delta").mode(
    "overwrite").save(str(silver_path / "reviews"))
products_clean.write.format("delta").mode(
    "overwrite").save(str(silver_path / "products"))
sellers_clean.write.format("delta").mode(
    "overwrite").save(str(silver_path / "sellers"))