# OpenLineage + Spark (Complex Transformations)

This notebook generates richer lineage by running a multi-stage Spark pipeline with:
- multiple source datasets
- deduplication with window functions
- multi-way joins
- derived metrics and ranking
- intermediate (silver) and final (gold) outputs


In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = (
    SparkSession.builder
    .appName("openlineage-complex-demo")
    .getOrCreate()
)

spark.version

In [None]:
# Build synthetic source data
base_dir = "/home/jovyan/work/data/complex_demo"
source_dir = os.path.join(base_dir, "source")
silver_dir = os.path.join(base_dir, "silver")
gold_dir = os.path.join(base_dir, "gold")

os.makedirs(source_dir, exist_ok=True)
os.makedirs(silver_dir, exist_ok=True)
os.makedirs(gold_dir, exist_ok=True)

customers_csv = os.path.join(source_dir, "customers.csv")
orders_csv = os.path.join(source_dir, "orders.csv")
products_csv = os.path.join(source_dir, "products.csv")
fx_rates_csv = os.path.join(source_dir, "fx_rates.csv")

with open(customers_csv, "w", encoding="utf-8") as f:
    f.write("customer_id,full_name,country,segment,updated_at\n")
    f.write("1,Ana Gomez,US,enterprise,2026-01-01 09:00:00\n")
    f.write("1,Ana Gomez,US,enterprise,2026-01-03 11:00:00\n")
    f.write("2,Leo Martins,BR,midmarket,2026-01-02 08:15:00\n")
    f.write("3,Nina Kato,JP,smallbiz,2026-01-01 10:30:00\n")
    f.write("4,Ivy Shaw,GB,midmarket,2026-01-04 12:45:00\n")

with open(orders_csv, "w", encoding="utf-8") as f:
    f.write("order_id,customer_id,product_id,order_ts,quantity,unit_price,currency,discount_pct,status\n")
    f.write("1001,1,P01,2026-01-03 13:10:00,2,120,USD,0.10,completed\n")
    f.write("1002,2,P02,2026-01-03 13:25:00,1,300,BRL,0.00,completed\n")
    f.write("1003,2,P01,2026-01-04 09:00:00,3,120,BRL,0.05,completed\n")
    f.write("1004,3,P03,2026-01-04 18:42:00,1,5000,JPY,0.20,returned\n")
    f.write("1005,1,P03,2026-01-05 10:05:00,1,5100,JPY,0.15,completed\n")
    f.write("1006,4,P02,2026-01-05 14:20:00,4,290,GBP,0.08,completed\n")

with open(products_csv, "w", encoding="utf-8") as f:
    f.write("product_id,product_name,category,is_active\n")
    f.write("P01,Starter Analytics,analytics,true\n")
    f.write("P02,Growth Integrations,integration,true\n")
    f.write("P03,Enterprise Governance,governance,true\n")

with open(fx_rates_csv, "w", encoding="utf-8") as f:
    f.write("currency,rate_to_usd,effective_date\n")
    f.write("USD,1.0,2026-01-01\n")
    f.write("BRL,0.20,2026-01-01\n")
    f.write("JPY,0.007,2026-01-01\n")
    f.write("GBP,1.25,2026-01-01\n")

(customers_csv, orders_csv, products_csv, fx_rates_csv)

In [None]:
customers_raw = spark.read.option("header", True).csv(customers_csv)
orders_raw = spark.read.option("header", True).csv(orders_csv)
products_raw = spark.read.option("header", True).csv(products_csv)
fx_raw = spark.read.option("header", True).csv(fx_rates_csv)

customers_raw.show(truncate=False)
orders_raw.show(truncate=False)

In [None]:
# Standardize types + deduplicate customers on latest update
customers_typed = (
    customers_raw
    .withColumn("customer_id", F.col("customer_id").cast("int"))
    .withColumn("updated_at", F.to_timestamp("updated_at"))
)

customer_latest_w = Window.partitionBy("customer_id").orderBy(F.col("updated_at").desc())
customers_curated = (
    customers_typed
    .withColumn("rn", F.row_number().over(customer_latest_w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

orders_typed = (
    orders_raw
    .withColumn("order_id", F.col("order_id").cast("int"))
    .withColumn("customer_id", F.col("customer_id").cast("int"))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("discount_pct", F.col("discount_pct").cast("double"))
    .withColumn("order_ts", F.to_timestamp("order_ts"))
    .withColumn("order_date", F.to_date("order_ts"))
    .withColumn("gross_amount", F.col("quantity") * F.col("unit_price"))
    .withColumn("discount_amount", F.col("gross_amount") * F.col("discount_pct"))
    .withColumn("net_amount", F.col("gross_amount") - F.col("discount_amount"))
    .filter(F.col("status") == F.lit("completed"))
)

products_typed = products_raw.withColumn("is_active", F.col("is_active").cast("boolean"))
fx_typed = (
    fx_raw
    .withColumn("rate_to_usd", F.col("rate_to_usd").cast("double"))
    .withColumn("effective_date", F.to_date("effective_date"))
)

customers_curated.count(), orders_typed.count()

In [None]:
# Multi-way join for enriched order facts
orders_enriched = (
    orders_typed.alias("o")
    .join(customers_curated.alias("c"), on="customer_id", how="left")
    .join(products_typed.alias("p"), on="product_id", how="left")
    .join(fx_typed.alias("fx"), on="currency", how="left")
    .filter(F.col("p.is_active") == F.lit(True))
    .withColumn("net_amount_usd", F.round(F.col("net_amount") * F.col("rate_to_usd"), 2))
    .withColumn("discount_bucket", F.when(F.col("discount_pct") >= 0.15, F.lit("high")).otherwise(F.lit("standard")))
    .withColumn("region",
        F.when(F.col("country").isin("US", "CA"), F.lit("NA"))
         .when(F.col("country").isin("BR"), F.lit("LATAM"))
         .when(F.col("country").isin("JP"), F.lit("APAC"))
         .otherwise(F.lit("EMEA"))
    )
)

customer_spend_w = Window.partitionBy("customer_id").orderBy(F.col("order_ts").asc())
orders_scored = (
    orders_enriched
    .withColumn("customer_running_spend_usd", F.round(F.sum("net_amount_usd").over(customer_spend_w), 2))
    .withColumn("order_rank_for_customer", F.row_number().over(customer_spend_w))
)

orders_scored.select("order_id", "customer_id", "region", "net_amount_usd", "customer_running_spend_usd").show(truncate=False)

In [None]:
# Write silver layer outputs
customers_silver_path = os.path.join(silver_dir, "customers_curated")
orders_silver_path = os.path.join(silver_dir, "orders_scored")

customers_curated.write.mode("overwrite").parquet(customers_silver_path)
orders_scored.write.mode("overwrite").parquet(orders_silver_path)

customers_silver_path, orders_silver_path

In [None]:
# Build gold marts
daily_region_revenue = (
    orders_scored
    .groupBy("order_date", "region")
    .agg(
        F.round(F.sum("net_amount_usd"), 2).alias("revenue_usd"),
        F.countDistinct("order_id").alias("orders"),
        F.countDistinct("customer_id").alias("active_customers")
    )
)

product_performance = (
    orders_scored
    .groupBy("product_id", "product_name", "category")
    .agg(
        F.round(F.sum("net_amount_usd"), 2).alias("revenue_usd"),
        F.sum("quantity").alias("units_sold"),
        F.round(F.avg("discount_pct"), 4).alias("avg_discount_pct")
    )
    .orderBy(F.col("revenue_usd").desc())
)

customer_360 = (
    orders_scored
    .groupBy("customer_id", "full_name", "country", "segment", "region")
    .agg(
        F.countDistinct("order_id").alias("orders"),
        F.round(F.sum("net_amount_usd"), 2).alias("lifetime_value_usd"),
        F.max("order_ts").alias("last_order_ts")
    )
    .withColumn("customer_tier",
        F.when(F.col("lifetime_value_usd") >= 2000, F.lit("platinum"))
         .when(F.col("lifetime_value_usd") >= 500, F.lit("gold"))
         .otherwise(F.lit("standard"))
    )
)

daily_region_revenue_path = os.path.join(gold_dir, "daily_region_revenue")
product_performance_path = os.path.join(gold_dir, "product_performance")
customer_360_path = os.path.join(gold_dir, "customer_360")

daily_region_revenue.write.mode("overwrite").parquet(daily_region_revenue_path)
product_performance.write.mode("overwrite").parquet(product_performance_path)
customer_360.write.mode("overwrite").parquet(customer_360_path)

daily_region_revenue.show(truncate=False)
product_performance.show(truncate=False)
customer_360.show(truncate=False)

In [None]:
{
    "silver": {
        "customers_curated": customers_silver_path,
        "orders_scored": orders_silver_path
    },
    "gold": {
        "daily_region_revenue": daily_region_revenue_path,
        "product_performance": product_performance_path,
        "customer_360": customer_360_path
    }
}

## What lineage you should see

You should observe a graph with multiple inputs and outputs, including:
- source files (`customers.csv`, `orders.csv`, `products.csv`, `fx_rates.csv`)
- silver datasets (`customers_curated`, `orders_scored`)
- gold datasets (`daily_region_revenue`, `product_performance`, `customer_360`)

Because Spark OpenLineage listener is enabled in `spark-defaults.conf`, each Spark action/write emits events to your configured OpenLineage endpoint.
