# DE1 — Lab 2: PostgreSQL → Star Schema ETL
> Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
---


Execute all cells. Attach evidence and fill metrics.

## 0. Setup and schemas

In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab2").getOrCreate()
base = ""
# Explicit schemas
customers_schema = T.StructType([
    T.StructField("customer_id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), True),
    T.StructField("email", T.StringType(), True),
    T.StructField("created_at", T.TimestampType(), True),
])
brands_schema = T.StructType([
    T.StructField("brand_id", T.IntegerType(), False),
    T.StructField("brand_name", T.StringType(), True),
])
categories_schema = T.StructType([
    T.StructField("category_id", T.IntegerType(), False),
    T.StructField("category_name", T.StringType(), True),
])
products_schema = T.StructType([
    T.StructField("product_id", T.IntegerType(), False),
    T.StructField("product_name", T.StringType(), True),
    T.StructField("brand_id", T.IntegerType(), True),
    T.StructField("category_id", T.IntegerType(), True),
    T.StructField("price", T.DoubleType(), True),
])
orders_schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("order_date", T.TimestampType(), True),
])
order_items_schema = T.StructType([
    T.StructField("order_item_id", T.IntegerType(), False),
    T.StructField("order_id", T.IntegerType(), True),
    T.StructField("product_id", T.IntegerType(), True),
    T.StructField("quantity", T.IntegerType(), True),
    T.StructField("unit_price", T.DoubleType(), True),
])


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/21 20:31:18 WARN Utils: Your hostname, Rana, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/21 20:31:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/21 20:31:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Ingest operational tables (from CSV exports)

In [2]:
customers = spark.read.schema(customers_schema).option("header","true").csv(base+"lab2_customers.csv")
brands = spark.read.schema(brands_schema).option("header","true").csv(base+"lab2_brands.csv")
categories = spark.read.schema(categories_schema).option("header","true").csv(base+"lab2_categories.csv")
products = spark.read.schema(products_schema).option("header","true").csv(base+"lab2_products.csv")
orders = spark.read.schema(orders_schema).option("header","true").csv(base+"lab2_orders.csv")
order_items = spark.read.schema(order_items_schema).option("header","true").csv(base+"lab2_order_items.csv")

for name, df in [("customers",customers),("brands",brands),("categories",categories),("products",products),("orders",orders),("order_items",order_items)]:
    print(name, df.count())


                                                                                

customers 24
brands 8
categories 9
products 60
orders 220
order_items 638


### Evidence: ingestion plan

In [3]:
ingest = orders.join(order_items, "order_id").select("order_id").distinct()
ingest.explain("formatted")
from datetime import datetime as _dt
import pathlib
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_ingest.txt","w") as f:
    f.write(str(_dt.now())+"\n")
    f.write(ingest._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_ingest.txt")


== Physical Plan ==
AdaptiveSparkPlan (11)
+- HashAggregate (10)
   +- Exchange (9)
      +- HashAggregate (8)
         +- Project (7)
            +- BroadcastHashJoin Inner BuildLeft (6)
               :- BroadcastExchange (3)
               :  +- Filter (2)
               :     +- Scan csv  (1)
               +- Filter (5)
                  +- Scan csv  (4)


(1) Scan csv 
Output [1]: [order_id#13]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/rania/OneDrive/Documents/Lab02/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>

(2) Filter
Input [1]: [order_id#13]
Condition : isnotnull(order_id#13)

(3) BroadcastExchange
Input [1]: [order_id#13]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=221]

(4) Scan csv 
Output [1]: [order_id#17]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/rania/OneDrive/Documents/Lab02/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id)]

## 2. Surrogate key function

In [4]:
def sk(cols):
    # stable 64-bit positive surrogate key from natural keys
    return F.abs(F.xxhash64(*[F.col(c) for c in cols]))


## 3. Build dimensions

In [5]:
dim_customer = customers.select(
    sk(["customer_id"]).alias("customer_sk"),
    "customer_id","name","email","created_at"
)

dim_brand = brands.select(
    sk(["brand_id"]).alias("brand_sk"),
    "brand_id","brand_name"
)

dim_category = categories.select(
    sk(["category_id"]).alias("category_sk"),
    "category_id","category_name"
)

dim_product = products.select(
    sk(["product_id"]).alias("product_sk"),
    "product_id","product_name",
    sk(["brand_id"]).alias("brand_sk"),
    sk(["category_id"]).alias("category_sk"),
    "price"
)


In [6]:
print("dim_customer:", dim_customer.count())
print("dim_brand:", dim_brand.count())
print("dim_category:", dim_category.count())
print("dim_product:", dim_product.count())



dim_customer: 24
dim_brand: 8
dim_category: 9
dim_product: 60


## 4. Build date dimension

In [7]:
from pyspark.sql import Window as W
dates = orders.select(F.to_date("order_date").alias("date")).distinct()
dim_date = dates.select(
    sk(["date"]).alias("date_sk"),
    F.col("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.date_format("date","E").alias("dow")
)


## 5. Build fact_sales with broadcast joins where appropriate

In [10]:
from pyspark.sql import functions as F

# --- Safety: define sk if not already defined ---
# (If sk already exists, you can skip this cell)
def sk(cols):
    # stable surrogate key from natural keys
    return F.abs(F.xxhash64(*[F.col(c) for c in cols]))

# --- Aliases ---
oi = order_items.alias("oi")
p  = products.alias("p")
o  = orders.alias("o")
c  = customers.alias("c")

# --- Build df_fact with JOINs (make sure join keys match your columns) ---
df_fact = (
    oi
    .join(p, F.col("oi.product_id") == F.col("p.product_id"), "inner")
    .join(o, F.col("oi.order_id") == F.col("o.order_id"), "inner")
    .join(c, F.col("o.customer_id") == F.col("c.customer_id"), "inner")
)

# --- IMPORTANT: select ONLY the columns we need, and make names NON-AMBIGUOUS ---
# Choose ONE product_id (here: order_items product_id), and ONE customer_id (from orders).
df_fact = df_fact.select(
    F.col("o.order_id").alias("order_id"),
    F.col("o.customer_id").alias("customer_id"),
    F.col("oi.product_id").alias("product_id"),
    F.to_date(F.col("o.order_date")).alias("date"),
    F.col("oi.quantity").alias("quantity"),
    F.col("oi.unit_price").alias("unit_price"),
)

# --- Attach surrogate keys + metrics columns ---
df_fact = (
    df_fact
    .withColumn("date_sk", sk(["date"]))
    .withColumn("customer_sk", sk(["customer_id"]))
    .withColumn("product_sk", sk(["product_id"]))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("subtotal", F.col("quantity") * F.col("unit_price"))
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .select(
        "order_id","date_sk","customer_sk","product_sk",
        "quantity","unit_price","subtotal","year","month"
    )
)

# --- Explain + save plan (required evidence) ---
df_fact.explain("formatted")

from datetime import datetime as _dt
with open("proof/plan_fact_join.txt", "w") as f:
    f.write(str(_dt.now()) + "\n")
    f.write(df_fact._jdf.queryExecution().executedPlan().toString())

print("Saved proof/plan_fact_join.txt")



== Physical Plan ==
AdaptiveSparkPlan (19)
+- Project (18)
   +- Project (17)
      +- BroadcastHashJoin Inner BuildRight (16)
         :- Project (12)
         :  +- BroadcastHashJoin Inner BuildRight (11)
         :     :- Project (7)
         :     :  +- BroadcastHashJoin Inner BuildRight (6)
         :     :     :- Filter (2)
         :     :     :  +- Scan csv  (1)
         :     :     +- BroadcastExchange (5)
         :     :        +- Filter (4)
         :     :           +- Scan csv  (3)
         :     +- BroadcastExchange (10)
         :        +- Filter (9)
         :           +- Scan csv  (8)
         +- BroadcastExchange (15)
            +- Filter (14)
               +- Scan csv  (13)


(1) Scan csv 
Output [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/rania/OneDrive/Documents/Lab02/lab2_order_items.csv]
PushedFilters: [IsNotNull(product_id), IsNotNull(order_id)]
ReadSchema: struct<order_id:int,p

In [11]:
df_fact.count()


638

## 6. Write Parquet outputs (partitioned by year, month)

In [12]:
base_out = "outputs/lab2"
(dim_customer.write.mode("overwrite").parquet(f"{base_out}/dim_customer"))
(dim_brand.write.mode("overwrite").parquet(f"{base_out}/dim_brand"))
(dim_category.write.mode("overwrite").parquet(f"{base_out}/dim_category"))
(dim_product.write.mode("overwrite").parquet(f"{base_out}/dim_product"))
(dim_date.write.mode("overwrite").parquet(f"{base_out}/dim_date"))
(df_fact.write.mode("overwrite").partitionBy("year","month").parquet(f"{base_out}/fact_sales"))
print("Parquet written under outputs/lab2/")


                                                                                

Parquet written under outputs/lab2/


## 7. Plan comparison: projection and layout

In [13]:
# Case A: join and then project
a = (orders.join(order_items, "order_id")
            .join(products, "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
a.explain("formatted")
_ = a.count()




== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
   +- Exchange (14)
      +- HashAggregate (13)
         +- Project (12)
            +- BroadcastHashJoin Inner BuildRight (11)
               :- Project (7)
               :  +- BroadcastHashJoin Inner BuildLeft (6)
               :     :- BroadcastExchange (3)
               :     :  +- Filter (2)
               :     :     +- Scan csv  (1)
               :     +- Filter (5)
               :        +- Scan csv  (4)
               +- BroadcastExchange (10)
                  +- Filter (9)
                     +- Scan csv  (8)


(1) Scan csv 
Output [2]: [order_id#13, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/rania/OneDrive/Documents/Lab02/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>

(2) Filter
Input [2]: [order_id#13, order_date#15]
Condition : isnotnull(order_id#13)

(3) BroadcastExchange
Input [2]: [order_id#13, order_date

In [14]:
# Case B: project early
b = (orders.select("order_id","order_date")
            .join(order_items.select("order_id","product_id","quantity"), "order_id")
            .join(products.select("product_id","price"), "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
b.explain("formatted")
_ = b.count()

print("Record Spark UI metrics for both runs in lab2_metrics_log.csv")

== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
   +- Exchange (14)
      +- HashAggregate (13)
         +- Project (12)
            +- BroadcastHashJoin Inner BuildRight (11)
               :- Project (7)
               :  +- BroadcastHashJoin Inner BuildLeft (6)
               :     :- BroadcastExchange (3)
               :     :  +- Filter (2)
               :     :     +- Scan csv  (1)
               :     +- Filter (5)
               :        +- Scan csv  (4)
               +- BroadcastExchange (10)
                  +- Filter (9)
                     +- Scan csv  (8)


(1) Scan csv 
Output [2]: [order_id#13, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/rania/OneDrive/Documents/Lab02/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>

(2) Filter
Input [2]: [order_id#13, order_date#15]
Condition : isnotnull(order_id#13)

(3) BroadcastExchange
Input [2]: [order_id#13, order_date

## 8. Cleanup

In [15]:
spark.stop()
print("Spark session stopped.")


Spark session stopped.
