# DE1 — Lab 3: Physical Representations and Batch II Costs
> Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
---

Execute all cells. Capture plans and Spark UI evidence.

## 0. Setup and explicit schema

In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab3").getOrCreate()

clicks_schema = T.StructType([
    T.StructField("prev_title", T.StringType(), True),
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("n", T.IntegerType(), True),
    T.StructField("ts", T.TimestampType(), True),
])
dim_schema = T.StructType([
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("curr_category", T.StringType(), True),
])


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/27 19:04:25 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/27 19:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/27 19:04:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Ingest monthly CSVs (row format baseline)

In [2]:
base = "data/"
paths = [f"{base}lab3_clicks_2025-05.csv", f"{base}lab3_clicks_2025-06.csv", f"{base}lab3_clicks_2025-07.csv"]
row_df = (spark.read.schema(clicks_schema).option("header","true").csv(paths)
            .withColumn("year", F.year("ts")).withColumn("month", F.month("ts")))
row_df.cache()
print("Rows:", row_df.count())
row_df.printSchema()
row_df.show(5, truncate=False)


[Stage 0:>                                                          (0 + 3) / 3]

Rows: 15000
root
 |-- prev_title: string (nullable = true)
 |-- curr_title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- n: integer (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-----------------------------+--------------+----+---+-------------------+----+-----+
|prev_title                   |curr_title    |type|n  |ts                 |year|month|
+-----------------------------+--------------+----+---+-------------------+----+-----+
|ETL                          |PySpark       |link|431|2025-06-01 02:57:00|2025|6    |
|Data_engineering             |Broadcast_join|link|347|2025-06-15 13:40:00|2025|6    |
|Python_(programming_language)|MapReduce     |link|39 |2025-06-07 15:14:00|2025|6    |
|ETL                          |Data_warehouse|link|401|2025-06-07 04:59:00|2025|6    |
|Python_(programming_language)|Dataframe     |link|155|2025-06-06 06:40:00|2025|6    |
+-------------------

                                                                                

In [3]:
import requests
import pandas as pd
import os
from datetime import datetime

def export_lab3_metrics(spark, run_id, query, representation="", notes="", output_csv="lab3_metrics_log.csv"):
    """
    Export selected Spark metrics to a CSV for Lab 3.
    - spark: active SparkSession
    - run_id: identifier for the run (ex: 'r1')
    - query: query name (Q1, Q2, Q3, Join)
    - representation: 'row' or 'column', or 'normal'/'broadcast' for Join
    - notes: optional comment
    - output_csv: path to CSV file
    """
    ui_url = spark.sparkContext.uiWebUrl
    app_id = spark.sparkContext.applicationId
    
    if not ui_url:
        print("No Spark UI detected. Make sure a job is running.")
        return
    
    try:
        stages_url = f"{ui_url}/api/v1/applications/{app_id}/stages"
        stages = requests.get(stages_url).json()
    except Exception as e:
        print(f"Error accessing Spark API: {e}")
        return
    
    data = []
    for s in stages:
        input_bytes = s.get("inputBytes", 0)
        shuffle_read = s.get("shuffleReadBytes", 0)
        shuffle_write = s.get("shuffleWriteBytes", 0)
        files_read = s.get("inputRecords", 0)
        
        data.append({
            "run_id": run_id,
            "query": query,
            "representation": representation,
            "files_read": files_read,
            "input_size_bytes": input_bytes,
            "shuffle_read_bytes": shuffle_read,
            "shuffle_write_bytes": shuffle_write,
            "notes": notes,
            "timestamp": datetime.now().isoformat()
        })
    
    df = pd.DataFrame(data)
    if df.empty:
        print(f"No metrics found for {query}-{representation}")
        return
    
    if os.path.exists(output_csv):
        df.to_csv(output_csv, mode="a", header=False, index=False)
    else:
        df.to_csv(output_csv, index=False)
    
    print(f"{len(df)} rows exported to {output_csv}")


### Evidence: row representation plan

In [4]:
# Query Q1: top transitions per month for 'link'
q1_row = (row_df.filter(F.col("type")=="link")
           .groupBy("year","month","prev_title","curr_title")
           .agg(F.sum("n").alias("n"))
           .orderBy(F.desc("n"))
           .limit(50))
q1_row.explain("formatted")
export_lab3_metrics(spark, "r1", "Q1", representation="row")


import pathlib, datetime as _dt
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_row.txt","w") as f:
    f.write(str(_dt.datetime.now())+"\n")
    f.write(q1_row._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_row.txt")


== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
   +- HashAggregate (9)
      +- Exchange (8)
         +- HashAggregate (7)
            +- Project (6)
               +- Filter (5)
                  +- InMemoryTableScan (1)
                        +- InMemoryRelation (2)
                              +- * Project (4)
                                 +- Scan csv  (3)


(1) InMemoryTableScan
Output [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Arguments: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6], [isnotnull(type#2), (type#2 = link)]

(2) InMemoryRelation
Arguments: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year#6, month#7], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan csv 
Output [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_clicks_2025-05.csv, ... 2 entries]
ReadSchema: struct<prev_title:string,cur

## 2. Column representation: Parquet with partitioning and optional sort

In [5]:
from pyspark.sql.types import IntegerType
col_base = "outputs/columnar"
# Write columnar
(row_df
 .write.mode("overwrite")
 .partitionBy("year","month")
 .parquet(f"{col_base}/clicks_parquet"))

# Re‑read columnar for fair comparison
col_df = spark.read.schema(clicks_schema.add("year",IntegerType()).add("month",IntegerType())).parquet(f"{col_base}/clicks_parquet")
col_df.cache()
print("Columnar rows:", col_df.count())


                                                                                

Columnar rows: 15000


### Evidence: column representation plan

In [6]:
q1_col = (col_df.filter(F.col("type")=="link")
           .groupBy("year","month","prev_title","curr_title")
           .agg(F.sum("n").alias("n"))
           .orderBy(F.desc("n"))
           .limit(50))
q1_col.explain("formatted")
export_lab3_metrics(spark, "r1", "Q1", representation="column")

with open("proof/plan_column.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(q1_col._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_column.txt")


== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
   +- HashAggregate (9)
      +- Exchange (8)
         +- HashAggregate (7)
            +- Project (6)
               +- Filter (5)
                  +- InMemoryTableScan (1)
                        +- InMemoryRelation (2)
                              +- * ColumnarToRow (4)
                                 +- Scan parquet  (3)


(1) InMemoryTableScan
Output [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]
Arguments: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657], [isnotnull(type#654), (type#654 = link)]

(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3

## 3. Join strategy: normal vs broadcast

In [7]:
dim = spark.read.schema(dim_schema).option("header","true").csv("data/lab3_dim_curr_category.csv")
# Non‑broadcast join
j1 = (col_df.join(dim, "curr_title", "left")
      .groupBy("curr_category")
      .agg(F.sum("n").alias("total_n"))
      .orderBy(F.desc("total_n")))
j1.explain("formatted")
export_lab3_metrics(spark, "r1", "Join", representation="normal")


# Broadcast join
from pyspark.sql.functions import broadcast
j2 = (col_df.join(broadcast(dim), "curr_title", "left")
      .groupBy("curr_category")
      .agg(F.sum("n").alias("total_n"))
      .orderBy(F.desc("total_n")))
j2.explain("formatted")
export_lab3_metrics(spark, "r1", "Join", representation="broadcast")


# Save one plan for evidence
with open("proof/plan_broadcast.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(j2._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_broadcast.txt")


== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
   +- Exchange (13)
      +- HashAggregate (12)
         +- Exchange (11)
            +- HashAggregate (10)
               +- Project (9)
                  +- BroadcastHashJoin LeftOuter BuildRight (8)
                     :- InMemoryTableScan (1)
                     :     +- InMemoryRelation (2)
                     :           +- * ColumnarToRow (4)
                     :              +- Scan parquet  (3)
                     +- BroadcastExchange (7)
                        +- Filter (6)
                           +- Scan csv  (5)


(1) InMemoryTableScan
Output [2]: [curr_title#653, n#655]
Arguments: [curr_title#653, n#655]

(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryF

## 4. Additional queries for metrics

In [8]:
# Q2: daily GMV‑like metric (sum of n) for a specific title window
q2_row = (row_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
           .groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))
q2_col = (col_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
           .groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))

# Trigger
_ = q2_row.count(); _ = q2_col.count()
export_lab3_metrics(spark, "r1", "Q2", representation="row")
export_lab3_metrics(spark, "r1", "Q2", representation="column")

# Q3: heavy cardinality grouping
q3_row = row_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
q3_col = col_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
_ = q3_row.count(); _ = q3_col.count()
export_lab3_metrics(spark, "r1", "Q3", representation="row")
export_lab3_metrics(spark, "r1", "Q3", representation="column")

print("Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv")


22 rows exported to lab3_metrics_log.csv
22 rows exported to lab3_metrics_log.csv
28 rows exported to lab3_metrics_log.csv
28 rows exported to lab3_metrics_log.csv
Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv


## 5. Save sample outputs

In [14]:
import pathlib, pandas as pd
pathlib.Path("outputs").mkdir(parents=True, exist_ok=True)
q1_row.limit(10).toPandas().to_csv("outputs/q1_row_top10.csv", index=False)
q1_col.limit(10).toPandas().to_csv("outputs/q1_col_top10.csv", index=False)
j2.limit(20).toPandas().to_csv("outputs/j2_broadcast_sample.csv", index=False)
print("Saved sample outputs in outputs")


Saved sample outputs in outputs


## 6. Cleanup

In [9]:
spark.stop()
print("Spark session stopped.")


Spark session stopped.
