# ESIEE Paris — Data Engineering I — Assignment 2
> Author : Couzinet Lorenzo & Rabahi Enzo 

**Academic year:** 2025–2026  
**Program:** Data & Applications - Engineering - (FD)   
**Course:** Data Engineering I  

---


## Data inputs
Define your input paths. Use small CSV/JSON/Parquet files so the notebook runs locally. If your dataset requires credentials, create a **sample subset** and commit only that.

**Paths to set:**
- `SOURCE_A_PATH` (fact‑like dataset)
- `SOURCE_B_PATH` (dimension‑like dataset)
- `OUTPUT_BASE` (directory for Parquet output)


In [40]:
# Read carefully the helper to review what is missing here

SOURCE_A_PATH = 'events.csv'
SOURCE_B_PATH = 'product.csv'
OUTPUT_BASE   = 'output'

print(f"Source A: {SOURCE_A_PATH}")
print(f"Source B: {SOURCE_B_PATH}")
print(f"Output:   {OUTPUT_BASE}")


Source A: events.csv
Source B: product.csv
Output:   output


## Pipeline API (implementations required)
Implement the following functions. Keep signatures stable. Use explicit schemas when possible. Log counts at each stage.


In [41]:
from typing import Optional, Tuple
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T

def ingest(spark, path_a: str, path_b: str) -> Tuple[DataFrame, DataFrame]:
    """Load SOURCE_A and SOURCE_B. Apply explicit schemas where possible.
    Return two DataFrames with uniform column naming.
    """
    # write some code here
    df_a = spark.read.option("header", "true").option("inferSchema", "true").csv(path_a)
    df_b = spark.read.option("header", "true").option("inferSchema", "true").csv(path_b)

    print(f"Ingested A: {df_a.count()} rows")
    print(f"Ingested B: {df_b.count()} rows")

    return df_a, df_b

def transform(df_a: DataFrame, df_b: DataFrame) -> DataFrame:
    """Clean, deduplicate, and normalize. Add parsed timestamps.
    Drop obvious null records. Prepare keys for join.
    """
    # write some code here
    df_clean = df_a.dropDuplicates()
    
    if "event_time" in df_clean.columns:
        df_clean = df_clean.withColumn("event_time", F.to_timestamp("event_time"))
        df_clean = df_clean.withColumn("date", F.to_date("event_time"))
    
    required_cols = [c for c in ["session_id", "product_id"] if c in df_clean.columns]
    df_clean = df_clean.dropna(subset=required_cols)
    
    return df_clean

def join_and_aggregate(df: DataFrame, dim: DataFrame) -> DataFrame:
    """Join with dim table. Handle potential skew (hint: salting or AQE).
    Compute business aggregates with window or groupBy.
    """
    # write some code here
    common_cols = list(set(df.columns) & set(dim.columns))
    
    if not common_cols:
        print("Warning: No common columns found for join.")
        return df
        
    join_col = common_cols[0] 
    
    df_joined = df.join(F.broadcast(dim), on=join_col, how="inner")
    
    if "brand" in df_joined.columns and "price" in df_joined.columns and "date" in df_joined.columns:
        res = (df_joined
               .groupBy("date", "brand")
               .agg(
                   F.count("*").alias("total_events"),
                   F.sum("price").alias("total_sales")
               )
               .orderBy("date", F.desc("total_sales"))
              )
        return res
    
    return df_joined

def write_out(df: DataFrame, base: str, partitions: list[str]) -> None:
    """Write Parquet, overwrite mode, partitioned by `partitions`.
    Optimize small files if needed (coalesce).
    """
    # write some code here
    writer = df.coalesce(1).write.mode("overwrite")
    
    valid_partitions = [p for p in partitions if p in df.columns]
    if valid_partitions:
        writer = writer.partitionBy(*valid_partitions)
        
    writer.parquet(base)
    print(f"Data written to {base} partitioned by {valid_partitions}")


## Tasks
1. **Ingest**: read `SOURCE_A_PATH`, `SOURCE_B_PATH`. Provide explicit schemas. Count rows and malformed records.
2. **Transform**: standardize column names, cast types, parse timestamps into UTC, deduplicate using keys.
3. **Join + Aggregate**: explain your join strategy. Mitigate skew. Produce a tidy table with daily metrics.
4. **Store**: write partitioned Parquet to `OUTPUT_BASE`, e.g., partition by `date` and one categorical column.
5. **Explain plans**: capture `df.explain(mode='formatted')` for transform, join, and final write.
6. **Quality gates**: implement three checks (row count non‑zero, null rate thresholds, referential coverage). Abort if a gate fails.
7. **Reproducibility**: document your Spark config and any seeds. Describe how to re‑run.


In [42]:
from pyspark.sql import SparkSession
import findspark
# Orchestration
## Replace raises with your implementation, then run this driver.
if spark is not None:
    # write some code here
    # df_a, df_b = ingest(spark, SOURCE_A_PATH, SOURCE_B_PATH)
    # stg = transform(df_a, df_b)
    # out = join_and_aggregate(stg, df_b)
    # print('Final count:', out.count())
    # print('Plan:')
    # out.explain(mode='formatted')

    if SOURCE_A_PATH and SOURCE_B_PATH:
        print("Starting Pipeline execution...")
        
        df_a, df_b = ingest(spark, SOURCE_A_PATH, SOURCE_B_PATH)
        
        stg = transform(df_a, df_b)
        
        out = join_and_aggregate(stg, df_b)
        
        print('Final count (Aggregated rows):', out.count())
        print('Plan:')
        out.explain(mode='formatted')
        
        write_out(out, OUTPUT_BASE, partitions=["date"])
    
else:
    print("Les fichiers sources n'existent pas encore.")
    print("Exécuter la section '3. The Extract in ETL' plus bas dans le notebook")
    print("pour extraire les CSV depuis PostgreSQL, puis revenir exécuter cette cellule.")


Starting Pipeline execution...


Py4JJavaError: An error occurred while calling o1316.csv.
: org.apache.spark.SparkException: [INTERNAL_ERROR] The "head" action failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:643)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:656)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:121)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:72)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:63)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:219)
	at scala.Option.orElse(Option.scala:477)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:216)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:422)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)
	at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$1(Dataset.scala:115)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:113)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:109)
	at org.apache.spark.sql.classic.DataFrameReader.load(DataFrameReader.scala:58)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:392)
	at org.apache.spark.sql.classic.DataFrameReader.csv(DataFrameReader.scala:259)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
		at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:643)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:656)
		at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
		at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
		at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
		at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
		at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
		at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
		at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
		at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
		at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:121)
		at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:72)
		at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:63)
		at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:219)
		at scala.Option.orElse(Option.scala:477)
		at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:216)
		at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:422)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
		at scala.Option.getOrElse(Option.scala:201)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
		at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
		at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
		at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
		at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
		at scala.collection.immutable.List.foldLeft(List.scala:79)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
		at scala.collection.immutable.List.foreach(List.scala:334)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
		at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
		at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
		at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
		at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
		at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
		at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
		at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
		at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
		at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
		at scala.util.Try$.apply(Try.scala:217)
		at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
		at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
		at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
		... 22 more
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.classic.SparkSession.sparkContext()" because the return value of "org.apache.spark.sql.execution.SparkPlan.session()" is null
	at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:68)
	at org.apache.spark.sql.execution.CollectLimitExec.readMetrics$lzycompute(limit.scala:68)
	at org.apache.spark.sql.execution.CollectLimitExec.readMetrics(limit.scala:67)
	at org.apache.spark.sql.execution.CollectLimitExec.metrics$lzycompute(limit.scala:69)
	at org.apache.spark.sql.execution.CollectLimitExec.metrics(limit.scala:69)
	at org.apache.spark.sql.execution.SparkPlan.resetMetrics(SparkPlan.scala:147)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2233)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:121)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:72)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:63)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:219)
	at scala.Option.orElse(Option.scala:477)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:216)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:422)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:61)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:112)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:45)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:43)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
	at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
	at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
	at scala.collection.immutable.List.foldLeft(List.scala:79)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:231)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:231)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:340)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:234)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:336)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:299)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:201)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:190)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)
	at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
	... 22 more


# Assignment 2: ETL


## 1. Querying the Operational Database

Let's run a query to verify that the operational database has been properly restored and that we can issue a query to PostgreSQL:

In [49]:
!brew services stop postgresql@17

Stopping `postgresql@17`... (might take a while)
[34m==>[0m [1mSuccessfully stopped `postgresql@17` (label: homebrew.mxcl.postgresql@17)[0m


In [73]:
!brew services start postgresql@17

[34m==>[0m [1mSuccessfully started `postgresql@17` (label: homebrew.mxcl.postgresql@17)[0m


In [74]:
!/opt/homebrew/opt/postgresql@17/bin/dropdb --if-exists esiee_full
!/opt/homebrew/opt/postgresql@17/bin/createdb esiee_full
!/opt/homebrew/opt/postgresql@17/bin/pg_restore   -d esiee_full   --jobs 4 --clean --if-exists   --no-owner --no-acl   retail_schema_20250826.dump

In [None]:
# First cell won't run
# !PGPORT=5433 psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT COUNT(DISTINCT user_id) AS number_users FROM retail.user;"

# Solution 1
# !sudo -u postgres env PGPORT=5433 psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT COUNT(DISTINCT user_id) AS number_users FROM retail.user;"

# Solution 2
# 1- Run this in your terminal to create a read-only user for the database :
!/opt/homebrew/opt/postgresql@17/bin/psql -d esiee_full -p 5432 -c "CREATE ROLE esiee_reader LOGIN PASSWORD 'azerty123'; GRANT CONNECT ON DATABASE esiee_full TO esiee_reader; GRANT USAGE ON SCHEMA retail TO esiee_reader; GRANT SELECT ON ALL TABLES IN SCHEMA retail TO esiee_reader; ALTER DEFAULT PRIVILEGES IN SCHEMA retail GRANT SELECT ON TABLES TO esiee_reader;"

CREATE ROLE
GRANT
GRANT
GRANT
ALTER DEFAULT PRIVILEGES


In [86]:
# 2- Then, in your notebook, set the environment variables like this:
import os
os.environ['PGHOST'] = '127.0.0.1'   # force TCP (not Unix socket)
os.environ['PGPORT'] = '5432'
os.environ['PGUSER'] = 'esiee_reader'
os.environ['PGPASSWORD'] = 'azerty123'

In [89]:
# 3- then
!psql -d esiee_full -c 'SELECT COUNT(DISTINCT user_id) AS number_users FROM retail."user";'

 number_users 
--------------
      3022290
(1 row)



**The correct answer should be 3022290.**

If running the cell above gives you the same answer, the everything should be in order.

If you're getting an error, fix it before moving on.

In [92]:
!psql -d esiee_full -c "select count(*) from retail.events"

  count   
----------
 42418541
(1 row)



In [90]:
!psql -d esiee_full -c "\dt retail.*"

            List of relations
 Schema |     Name     | Type  |  Owner  
--------+--------------+-------+---------
 retail | brand        | table | lorenzo
 retail | category     | table | lorenzo
 retail | events       | table | lorenzo
 retail | product      | table | lorenzo
 retail | product_name | table | lorenzo
 retail | session      | table | lorenzo
 retail | user         | table | lorenzo
(7 rows)



In [91]:
!psql -d esiee_full -c "select * from retail.events limit 10"

       event_time       | event_type |              session_id              | product_id |  price   
------------------------+------------+--------------------------------------+------------+----------
 2019-10-01 04:17:42+02 | view       | 521147bb-459e-4874-8153-3ab2fa290ad3 |   13700159 |   470.85
 2019-10-01 04:17:42+02 | view       | 34253b2e-b8eb-4e0a-b758-f376a41e96df |    4803710 |    25.71
 2019-10-01 04:17:42+02 | view       | dfc31a56-7292-4f70-b098-d0a2812fbdc6 |   13700226 | 18173.00
 2019-10-01 04:17:42+02 | view       | 4d74b49e-3838-43cc-ac2e-a0490942b464 |   13103842 |   391.26
 2019-10-01 04:17:42+02 | view       | 92d24e81-856a-4157-8689-d38a4a8210bf |    1004669 |    74.39
 2019-10-01 04:17:43+02 | view       | d5de76ec-7ef2-4d78-bf16-220f2ca3fcdb |    1004247 |   809.72
 2019-10-01 04:17:43+02 | view       | fd492d02-c6d6-4575-a967-e7136853eb0a |   18000964 |    19.30
 2019-10-01 04:17:43+02 | view       | d4fbe676-9a7e-48d0-9c38-a8e54a7408f5 |    9101345 |    72.1

As a warmup exercise, write SQL queries against the operational database to answer the following questions and report the answers.
Place both your SQL queries and answers in the following cell, replacing the placeholder texts that exist there.
Each question needs to be answered by a _single_ SQL query (that is, it is not acceptable to run multiple SQL queries and then compute the answer yourself).

1. For `session_id` `789d3699-028e-4367-b515-b82e2cb5225f`, what was the purchase price?
2. How many products are sold by the brand "sokolov"?
3. What is the average purchase price of items purchased from the brand "febest"?
4. What is average number of events per user? (Report answer to two digits after the decimal point, i.e., XX.XX)

**write some code here**

// qcell_1b76x2 (keep this id for tracking purposes)

**Q1 SQL:**

SELECT price
FROM retail.events 
WHERE session_id = '789d3699-028e-4367-b515-b82e2cb5225f' 
AND event_type = 'purchase';

**Q1 answer:**

 100.39

**Q2 SQL:**

SELECT COUNT(DISTINCT e.product_id) 
FROM retail.events e
JOIN retail.product p ON e.product_id= p.product_id
WHERE brand = 'sokolov';

**Q2 answer:**

1601

**Q3 SQL:**

SELECT AVG(e.price) 
FROM retail.events e
JOIN retail.product p ON e.product_id= p.product_id
WHERE p.brand = 'febest' 
AND e.event_type = 'purchase';

**Q3 answer:**

20.3934782608695652

**Q4 SQL:**

SELECT ROUND(COUNT(*)::numeric / COUNT(DISTINCT session_id), 2) 
FROM retail.events;  

**Q4 answer:**

4.59


In [93]:
# Q1 answer :

!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT price \
FROM retail.events \
WHERE session_id = '789d3699-028e-4367-b515-b82e2cb5225f' \
AND event_type = 'purchase';"

# or

!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT * \
FROM retail.events \
WHERE session_id = '789d3699-028e-4367-b515-b82e2cb5225f' \
AND event_type = 'purchase';"

 price  
--------
 100.39
(1 row)

       event_time       | event_type |              session_id              | product_id | price  
------------------------+------------+--------------------------------------+------------+--------
 2019-10-01 04:46:44+02 | purchase   | 789d3699-028e-4367-b515-b82e2cb5225f |   28401176 | 100.39
(1 row)



In [99]:
# Q2 answer :

!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT COUNT(DISTINCT e.product_id) \
FROM retail.events e\
JOIN retail.product p ON e.product_id= p.product_id\
WHERE brand = 'sokolov' \
;"

 count 
-------
  1601
(1 row)



In [100]:
# Q3 answer :

!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT AVG(e.price) \
FROM retail.events e\
JOIN retail.product p ON e.product_id= p.product_id\
WHERE p.brand = 'febest' \
AND e.event_type = 'purchase' \
;"

         avg         
---------------------
 20.3934782608695652
(1 row)



In [101]:
# Q4 answer :

!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT ROUND(COUNT(*)::numeric / COUNT(DISTINCT session_id), 2) \
FROM retail.events\
;"   

 round 
-------
  4.59
(1 row)



## 2. Setup

The following cell contains setup to measure wall clock time and memory usage. (Don't worry about the details, just run the cell)

In [2]:
# !brew install numpy pandas pyarrow matplotlib scipy
import sys, subprocess
try:
    import psutil  # noqa: F401
except Exception:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
print("psutil is installed.")


from IPython.core.magic import register_cell_magic
import time, os, platform

# Try to import optional modules
try:
    import psutil
except Exception:
    psutil = None

try:
    import resource  # not available on Windows
except Exception:
    resource = None


def _rss_bytes():
    """Resident Set Size in bytes (cross-platform via psutil if available)."""
    if psutil is not None:
        return psutil.Process(os.getpid()).memory_info().rss
    # Fallback: unknown RSS → 0 
    return 0


def _peak_bytes():
    """
    Best-effort peak memory in bytes.
    - Windows: psutil peak working set (peak_wset)
    - Linux:   resource.ru_maxrss (KB → bytes)
    - macOS:   resource.ru_maxrss (bytes)
    Fallback to current RSS if unavailable.
    """
    sysname = platform.system()

    # Windows path: use psutil peak_wset if present
    if sysname == "Windows" and psutil is not None:
        mi = psutil.Process(os.getpid()).memory_info()
        peak = getattr(mi, "peak_wset", None)  # should be available on Windows
        if peak is not None:
            return int(peak)
        return int(mi.rss)

    # POSIX path: resource may be available
    if resource is not None:
        try:
            ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            # On Linux ru_maxrss is in kilobytes; on macOS/BSD it is bytes
            if sysname == "Linux":
                return int(ru) * 1024
            else:
                return int(ru)
        except Exception:
            pass

    # Last resort
    return _rss_bytes()


@register_cell_magic
def timemem(line, cell):
    """
    Measure wall time and memory around the execution of this cell.

        %%timemem
        <your code>

    Notes:
    - RSS = resident memory after the cell.
    - Peak is OS-dependent (see _peak_bytes docstring).
    """
    ip = get_ipython()

    rss_before  = _rss_bytes()
    peak_before = _peak_bytes()
    t0 = time.perf_counter()

    # Execute the cell body
    result = ip.run_cell(cell)

    t1 = time.perf_counter()
    rss_after  = _rss_bytes()
    peak_after = _peak_bytes()

    wall = t1 - t0
    rss_delta_mb  = (rss_after  - rss_before)  / (1024 * 1024)
    peak_delta_mb = (peak_after - peak_before) / (1024 * 1024)

    print("======================================")
    print(f"Wall time: {wall:.3f} s")
    print(f"RSS Δ: {rss_delta_mb:+.2f} MB")
    print(f"Peak memory Δ: {peak_delta_mb:+.2f} MB (OS-dependent)")
    print("======================================")

    return result

psutil is installed.


## 3. The "Extract" in ETL

The operational database comprises the tables described in the helper.

For the "Extract" in ETL, we're going to extract the following CSV files, each corresponding to a table in the operational database:

- **user.csv**: `user_id, gender, birthdate`
- **session.csv**: `session_id, user_id`
- **product.csv**: `product_id, brand, category, product_name`
- **product_name.csv**: `category, product_name, description`
- **events.csv**: `event_time, event_type, session_id, product_id, price`
- **category.csv**: `category, description`
- **brand.csv**: `brand, description`

From these files, we'll build a data warehouse organized in a standard star schema that has the following tables:

- Dimension tables: `dim_user`, `dim_age`, `dim_brand`, `dim_category`, `dim_product`, `dim_date`, `dim_session`
- The main fact table `fact_events` with foreign keys: `date_key, user_key, age_key, product_key, brand_key, category_key, session_key`


Let's specify a "base directory":

In [5]:
# Change to path on your local machine.
BASE_DIR = "/Users/lorenzo/Documents/Cours/E4FD/Data-Engineering/Lab2/lab2Assignment"

These are the commands that perform the "extraction":

In [None]:
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."user"         TO '\''{BASE_DIR}/user.csv'\''         WITH (FORMAT csv, HEADER true)'
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."session"      TO '\''{BASE_DIR}/session.csv'\''      WITH (FORMAT csv, HEADER true)'
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."category"     TO '\''{BASE_DIR}/category.csv'\''     WITH (FORMAT csv, HEADER true)'
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."brand"        TO '\''{BASE_DIR}/brand.csv'\''        WITH (FORMAT csv, HEADER true)'
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."product_name" TO '\''{BASE_DIR}/product_name.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."product"      TO '\''{BASE_DIR}/product.csv'\''      WITH (FORMAT csv, HEADER true)'
!psql esiee_full -v ON_ERROR_STOP=1 -c '\copy "retail"."events"       TO '\''{BASE_DIR}/events.csv'\''       WITH (FORMAT csv, HEADER true)'

COPY 3022290
COPY 9244421
COPY 13
COPY 3444
COPY 127
COPY 166794
COPY 42418541


(Note that the quote style above will _not_ work for Windows machines. Please adjust accordingly.)

After the extraction, you should have 7 CSV files, each corresponding to a table in the operational database.

The CSV files should be stored in `BASE_DIR`.

The following code snippet should "just work" to initialize Spark.

In [3]:
import findspark, os, sys

# Change to path on your local machine.
os.environ["SPARK_HOME"] = "/Users/lorenzo/miniconda3/envs/de1-env/lib/python3.11/site-packages/pyspark"
findspark.init()

import os
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col

py = sys.executable  # the Python of this notebook (e.g., .../envs/yourenv/bin/python)
os.environ["PYSPARK_DRIVER_PYTHON"] = py
os.environ["PYSPARK_PYTHON"] = py

spark = SparkSession.getActiveSession() or (
    SparkSession.builder
    .appName("A2")
    .master("local[*]")
    .config("spark.driver.memory", "8g")           # or 12g+
    .config("spark.sql.shuffle.partitions","400")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.pyspark.driver.python", py)
    .config("spark.pyspark.python", py)
    .config("spark.executorEnv.PYSPARK_PYTHON", py)
    .getOrCreate()
)

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/29 15:36:59 WARN Utils: Your hostname, MacBook-Air-de-Lorenzo.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.58 instead (on interface en0)
25/12/29 15:36:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/29 15:37:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


At this point, Spark should be initialized.

Let's then load in CSV files into DataFrames.

write some code here

In [6]:
%%timemem
# codecell_30z8le (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.
# Chargement des DataFrames
df_user = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/user.csv")
df_session = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/session.csv")
df_product = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/product.csv")
df_product_name = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/product_name.csv")
df_events = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/events.csv")
df_category = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/category.csv")
df_brand = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{BASE_DIR}/brand.csv")

# By the time we get to here, we've loaded each of the CSV files into a corresponding dataframe.
# Let's count the number of records in each:

print(f"user: {df_user.count()}")
print(f"session: {df_session.count()}")
print(f"product: {df_product.count()}")
print(f"product_name: {df_product_name.count()}")
print(f"events: {df_events.count()}")
print(f"category: {df_category.count()}")
print(f"brand: {df_brand.count()}")

                                                                                

user: 3022290
session: 9244421
product: 166794
product_name: 127


                                                                                

events: 42418541
category: 13
brand: 3444
Wall time: 18.115 s
RSS Δ: +0.27 MB
Peak memory Δ: +0.27 MB (OS-dependent)


<ExecutionResult object at 1070d0410, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 1070d0510, raw_cell="# codecell_30z8le (keep this id for tracking purpo.." transformed_cell="# codecell_30z8le (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

How do you know if you've done everything correctly?

Well, issue the SQL query `select count(*) from retail.user;` to count the number of rows in the `user` table in the operational database.
It should match the output of `df_user.count()`; same for the other tables.
If the counts match, then you know everything is in order.

In [7]:
%%bash
# Ce script interroge la base de données pour compter les lignes de chaque table et vérifier si elles correspondent aux DataFrames chargés.
psql esiee_full -c "SELECT 'user' as table_name, count(*) FROM retail.user;"
psql esiee_full -c "SELECT 'session' as table_name, count(*) FROM retail.session;"
psql esiee_full -c "SELECT 'product' as table_name, count(*) FROM retail.product;"
psql esiee_full -c "SELECT 'product_name' as table_name, count(*) FROM retail.product_name;"
psql esiee_full -c "SELECT 'events' as table_name, count(*) FROM retail.events;"
psql esiee_full -c "SELECT 'category' as table_name, count(*) FROM retail.category;"
psql esiee_full -c "SELECT 'brand' as table_name, count(*) FROM retail.brand;"

 table_name |  count  
------------+---------
 user       | 3022290
(1 row)

 table_name |  count  
------------+---------
 session    | 9244421
(1 row)

 table_name | count  
------------+--------
 product    | 166794
(1 row)

  table_name  | count 
--------------+-------
 product_name |   127
(1 row)

 table_name |  count   
------------+----------
 events     | 42418541
(1 row)

 table_name | count 
------------+-------
 category   |    13
(1 row)

 table_name | count 
------------+-------
 brand      |  3444
(1 row)



## 4. Build the Dimensions Tables

### 4.1 The `user` Dimension Table

Build the `dim_user` dimension table.
This table should include `user_key`, `user_id`, `gender`, `birthdate`, and `generation`. 

Set `generation` to one of the following values based on the birth year: 
- "Traditionalists": born 1925 to 1945
- "Boomers": born 1946 to 1964
- "GenX": born 1965 to 1980
- "Millennials": born 1981 to 2000
- "GenZ": born 2001 to 2020

**write some code here**

In [8]:
%%timemem
# codecell_41ax14 (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

from pyspark.sql.functions import col, year, when, monotonically_increasing_id

# 1. Définition de la logique pour les générations
generation_expr = (
    when((year(col("birthdate")).between(1925, 1945)), "Traditionalists")
    .when((year(col("birthdate")).between(1946, 1964)), "Boomers")
    .when((year(col("birthdate")).between(1965, 1980)), "GenX")
    .when((year(col("birthdate")).between(1981, 2000)), "Millennials")
    .when((year(col("birthdate")).between(2001, 2020)), "GenZ")
    .otherwise("Unknown") 
)

# 2. Construction du DataFrame final
dim_user = df_user.withColumn("generation", generation_expr) \
                  .withColumn("user_key", monotonically_increasing_id()) \
                  .select("user_key", "user_id", "gender", "birthdate", "generation")
# By the time we get to here, "dim_user" should hold the user dimensions table according to the specification above.

dim_user.count()

3022290

Wall time: 0.235 s
RSS Δ: +0.06 MB
Peak memory Δ: +0.05 MB (OS-dependent)


<ExecutionResult object at 103fdd0d0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 103fdf810, raw_cell="# codecell_41ax14 (keep this id for tracking purpo.." transformed_cell="# codecell_41ax14 (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=3022290>

**The correct answer should be 3022290.**

### 4.2 The `age` Dimension Table

Even though `birthdate` exists in `dim_user`, a separate `dim_age` is helpful because it:
- Simplifies analysis with ready-made bands.
- Ensures consistency across all queries.
- Improves performance via small surrogate keys.
- Preserves history by fixing age at event time.
- Adds flexibility to adjust bands without changing facts.

We're going to build a `dim_age` table that has 4 columns:
- `age_key`: (INT, surrogate PK)
- `age_band`: (STRING) following the age band rules below
- `min_age`: (INT)
- `max_age`: (INT)

Bands:
- "<18": min_age = NULL, max_age = 17
- "18-24": 18, 24
- "25-34": 25, 34
- "35-44": 35, 44
- "45-54": 45, 54
- "55-64": 55, 64
- "65-74": 65, 74
- "75-84": 75, 84
- "85-94": 85, 94
- "unknown": NULL, NULL

The construction of this table is a bit tricky, so we're going to show you how to do it, as follows:

In [12]:
%%timemem
from pyspark.sql.window import Window

# Static age bands
age_band_rows = [
    ("<18",   None, 17),
    ("18-24", 18, 24),
    ("25-34", 25, 34),
    ("35-44", 35, 44),
    ("45-54", 45, 54),
    ("55-64", 55, 64),
    ("65-74", 65, 74),
    ("75-84", 75, 84),
    ("85-94", 85, 94),
    ("unknown", None, None),
]
dim_age = spark.createDataFrame(age_band_rows, ["age_band", "min_age", "max_age"])

w_age = Window.orderBy(F.col("age_band"))
dim_age = dim_age.withColumn("age_key", F.dense_rank().over(w_age))

dim_age.count()

10

Wall time: 0.212 s
RSS Δ: +0.00 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 1075d4390, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 1075d4d90, raw_cell="from pyspark.sql.window import Window

# Static ag.." transformed_cell="from pyspark.sql.window import Window

# Static ag.." store_history=False silent=False shell_futures=True cell_id=None> result=10>

**The correct answer should be 10.**

### 4.3 The `brand`, `product`, and `category` Dimension Tables

Build the following dimension tables:

**`dim_brand`:**
- `brand_key` (INT, surrogate PK)
- `brand_code` (STRING) 
- `brand_desc` (STRING)

**`dim_category`:**
- `category_key` (INT, surrogate PK)
- `category_code` (STRING) 
- `category_desc` (STRING)

**`dim_product`:**
- `product_key`  (INT, surrogate PK)
- `product_id`   (STRING)
- `product_desc` (STRING)
- `brand_key`   (INT, FK → `dim_brand`)  
- `category_key`(INT, FK → `dim_category`)

The Learning goals of `dim_product` is to keep all products in `product`, and add details from `product_names`, then join the results with `brand` and `category` dimension tables.

**write some code here**

In [17]:
%%timemem
# codecell_43k3n9 (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

# --- 1. Création de dim_brand ---
w_brand = Window.orderBy("brand")
dim_brand = df_brand.withColumn("brand_key", F.row_number().over(w_brand)) \
    .select(
        F.col("brand_key"),
        F.col("brand").alias("brand_code"),
        F.col("brand").alias("brand_desc")
    )

# --- 2. Création de dim_category ---
w_cat = Window.orderBy("category")
# On garde le dataframe complet temporairement pour générer les clés
dim_category_full = df_category.withColumn("category_key", F.row_number().over(w_cat))

# La dimension finale propre
dim_category = dim_category_full.select(
    F.col("category_key"),
    F.col("category").alias("category_code"),
    F.col("description").alias("category_desc")
)

# --- 3. Création de dim_product ---

# --- PRÉPARATION ---
# A. Nettoyage product_name : on renomme category pour éviter conflit
df_product_name_clean = df_product_name.withColumnRenamed("category", "cat_link")

# B. Nettoyage category pour la jointure :
# On ne garde QUE ce qui est nécessaire pour la jointure (le nom et la clé)
# On ne prend pas 'description' ici pour qu'elle ne rentre pas en conflit avec la description du produit
dim_category_for_join = dim_category_full.select("category", "category_key")


# --- JOINTURES ---
# Étape A : jointure sur product_name
df_p_enriched = df_product.join(df_product_name_clean, on="product_name", how="left")

# Étape B : Joindre avec dim_brand
df_p_brand = df_p_enriched.join(
    dim_brand, 
    df_p_enriched["brand"] == dim_brand["brand_code"], 
    how="left"
)

# Étape C : Joindre avec la version allégée de category
df_p_final = df_p_brand.join(
    dim_category_for_join, 
    df_p_brand["cat_link"] == dim_category_for_join["category"], 
    how="left"
)

# --- FINALISATION ---
# Étape D : Générer product_key final
w_prod = Window.orderBy("product_id")

dim_product = df_p_final.withColumn("product_key", F.row_number().over(w_prod)) \
    .select(
        F.col("product_key"),
        F.col("product_id"),
        # Maintenant "description" est unique (celle du produit), donc plus d'erreur
        F.coalesce(F.col("description"), F.col("product_name")).alias("product_desc"),
        F.col("brand_key"),
        F.col("category_key")
    )

# By the time we get to here, "dim_brand", "dim_category", and "dim_product" should hold 
# the dimension tables according to the specifications above.

print(f"Number of rows in dim_brand: {dim_brand.count()}")
print(f"Number of rows in dim_category: {dim_category.count()}")
print(f"Number of rows in dim_product: {dim_product.count()}")

Number of rows in dim_brand: 3444
Number of rows in dim_category: 13
Number of rows in dim_product: 166794
Wall time: 0.347 s
RSS Δ: +0.11 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 1070c44d0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 1070c7610, raw_cell="# codecell_43k3n9 (keep this id for tracking purpo.." transformed_cell="# codecell_43k3n9 (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

**Correct answers:**

+ Number of rows in `dim_brand`: 3444
+ Number of rows in `dim_category`: 13
+ Number of rows in `dim_product`: 166794

### 4.4  The `date` Dimension Table

This table is expected to have one row per calendar date. 

**`dim_date`:**
- `date_key`     (INT, surrogate PK; format YYYYMMDD)
- `date`         (DATE, the actual calendar date)
- `day`          (INT, 1–31)
- `day_of_week`  (INT, 1=Mon … 7=Sun)
- `day_name`     (STRING, e.g., Monday)
- `is_weekend`   (BOOLEAN)
- `week_of_year` (INT, 1–53, ISO week)
- `month`        (INT, 1–12)
- `month_name`   (STRING, e.g., January)
- `quarter`      (INT, 1–4)
- `year`         (INT)


There are 2025 years, each with 365 days. Do we need to have a table that big? 
We can, but we do not have to! 

Instead, follow these instructions to create only as many rows as we need:

1. Determine the date range (from the min and max `event_date` in `df_events`).
2. Generate all dates in that range with `F.sequence()`.
3. Derive attributes (`day`, `day_of_week`, ...).
4. Create `date_key` = `year * 10000 + month * 100 + day` (i.e., YYYYMMDD).
5. Assign `date_key` as the surrogate PK.

Build the `dim_date` table conforming to the specifications above.

**write some code here**

In [21]:
%%timemem
# codecell_44qm5c (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

# 1. Déterminer la plage de dates (Min et Max)
min_max = df_events.select(
    F.min(F.to_date(F.col("event_time"))).alias("min_date"),
    F.max(F.to_date(F.col("event_time"))).alias("max_date")
).collect()[0]

start_date = min_max["min_date"]
end_date = min_max["max_date"]

# 2. Générer la séquence de dates
df_date_sequence = spark.range(1).select(
    F.explode(
        F.sequence(F.lit(start_date), F.lit(end_date))
    ).alias("date")
)

# 3. Calculer tous les attributs demandés
dim_date = df_date_sequence.select(
    F.col("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day")
).withColumn(
    # Clé technique YYYYMMDD
    "date_key", 
    F.col("year") * 10000 + F.col("month") * 100 + F.col("day")
).withColumn(
    # weekday renvoie 0 (Lun) -> 6 (Dim). On ajoute 1 pour avoir 1 -> 7.
    "day_of_week", 
    F.weekday("date") + 1
).withColumn(
    "day_name", 
    F.date_format("date", "EEEE")
).withColumn(
    # Week-end si Samedi (6) ou Dimanche (7)
    "is_weekend", 
    F.col("day_of_week").isin([6, 7])
).withColumn(
    "week_of_year", 
    F.weekofyear("date")
).withColumn(
    "month_name", 
    F.date_format("date", "MMMM")
).withColumn(
    "quarter", 
    F.quarter("date")
).select(
    "date_key", "date", "day", "day_of_week", "day_name", 
    "is_weekend", "week_of_year", "month", "month_name", 
    "quarter", "year"
)

# By the time we get to here, "dim_date" should hold the dates dimension table according to the specification above.
# Affichage du résultat
print(f"Nombre de jours générés : {dim_date.count()}")
dim_date.show(5)



Nombre de jours générés : 32
+--------+----------+---+-----------+---------+----------+------------+-----+----------+-------+----+
|date_key|      date|day|day_of_week| day_name|is_weekend|week_of_year|month|month_name|quarter|year|
+--------+----------+---+-----------+---------+----------+------------+-----+----------+-------+----+
|20191001|2019-10-01|  1|          2|  Tuesday|     false|          40|   10|   October|      4|2019|
|20191002|2019-10-02|  2|          3|Wednesday|     false|          40|   10|   October|      4|2019|
|20191003|2019-10-03|  3|          4| Thursday|     false|          40|   10|   October|      4|2019|
|20191004|2019-10-04|  4|          5|   Friday|     false|          40|   10|   October|      4|2019|
|20191005|2019-10-05|  5|          6| Saturday|      true|          40|   10|   October|      4|2019|
+--------+----------+---+-----------+---------+----------+------------+-----+----------+-------+----+
only showing top 5 rows
Wall time: 8.646 s
RSS Δ: +0.

                                                                                

<ExecutionResult object at 104011450, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 104013ed0, raw_cell="# codecell_44qm5c (keep this id for tracking purpo.." transformed_cell="# codecell_44qm5c (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

**The correct answer should be 32.**

If you reach here, congratulations!
You have created all the dimension tables!

In [22]:
%%timemem

print(f"dim_user: {dim_user.count()}")
print(f"dim_age: {dim_age.count()}")
print(f"dim_brand: {dim_brand.count()}")
print(f"dim_category: {dim_category.count()}")
print(f"dim_product: {dim_product.count()}")
print(f"dim_date: {dim_date.count()}")

dim_user: 3022290
dim_age: 10
dim_brand: 3444
dim_category: 13
dim_product: 166794
dim_date: 32
Wall time: 0.660 s
RSS Δ: +0.00 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 10802c410, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 10802f890, raw_cell="
print(f"dim_user: {dim_user.count()}")
print(f"di.." transformed_cell="print(f"dim_user: {dim_user.count()}")
print(f"dim.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

**Correct answers:**

- `dim_user`: 3022290
- `dim_age`: 10
- `dim_brand`: 3444
- `dim_category`: 13
- `dim_product`: 166794
- `dim_date`: 32

## 5. Build the Fact Table

Now it's time to build the fact table!

Our goal in this step is to create a clean `fact_events` table that joins the events from the operational database to the dimension tables you've just built above.
Along the way, we're going to enforce data quality and do a bit of data cleaning.

### 5.1 Clean Events

Create `events_clean` by removing any record that "does not make sense".
Specifically:

- Start from the `df_events` DataFrame.
- Keep only rows with non-null timestamps, `session_id`, and `product_id`.
- Cast price to double; keep `NULL` prices (views/carts can be price-less) and non-negative values only.
- Drop dates in the future.
- Restrict to valid event types: `view`, `cart`, `purchase`, `remove`.

**write some code here**

In [23]:
%%timemem
# codecell_51ep7v (keep this id for tracking purposes)

from pyspark.sql import functions as F
from functools import reduce
from operator import and_ as AND

valid_types = ["view", "cart", "purchase", "remove"]

# TODO: Write your code below, but do not remove any lines already in this cell.

# 1. Conversion du prix en double (pour pouvoir comparer les valeurs négatives)
# On le fait au début pour simplifier la condition suivante
df_typed = df_events.withColumn("price", F.col("price").cast("double"))

# 2. Définition des conditions de nettoyage
# A. Champs obligatoires non nuls
cond_not_null = (
    F.col("event_time").isNotNull() & 
    F.col("session_id").isNotNull() & 
    F.col("product_id").isNotNull()
)

# B. Prix valide : Soit NULL (pour les vues/paniers), soit positif ou nul (>= 0)
cond_valid_price = (F.col("price").isNull()) | (F.col("price") >= 0)

# C. Pas de dates dans le futur (inférieures ou égales à "maintenant")
cond_not_future = F.col("event_time") <= F.current_timestamp()

# D. Types d'événements valides (définis dans la liste valid_types plus haut)
cond_valid_type = F.col("event_type").isin(valid_types)

# 3. Application de tous les filtres
events_clean = df_typed.filter(
    cond_not_null & 
    cond_valid_price & 
    cond_not_future & 
    cond_valid_type
)

# By the time we get to here, "events_clean" should conform to the specification above.

# Affichage pour vérification
print(f"Nombre de lignes restantes : {events_clean.count()}")

                                                                                

Nombre de lignes restantes : 42418541


                                                                                

42418541

Wall time: 18.409 s
RSS Δ: -32.27 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 107fd75d0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 107fd5110, raw_cell="# codecell_51ep7v (keep this id for tracking purpo.." transformed_cell="# codecell_51ep7v (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=42418541>

### 5.2 Cap Silly Prices

Next, let us check some statistics about prices and then decide what we want to do.

What is the minimum, maximum, and average price in this database?

**write some code here**

In [25]:
%%timemem
# codecell_52hg6x (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

# On sélectionne les agrégats en une seule passe pour être efficace
stats = events_clean.select(
    F.min("price").alias("min_p"),
    F.max("price").alias("max_p"),
    F.avg("price").alias("avg_p")
).collect()[0] # On récupère la première (et unique) ligne de résultat

# On assigne les valeurs aux variables demandées
minimum = stats["min_p"]
maximum = stats["max_p"]
average = stats["avg_p"]

# By the time we get to here, "minimum", "maximum", and "average" should conform to the specification above.

print(f"minimum: {minimum}")
print(f"maximum: {maximum}")
print(f"average: {average}")



minimum: 0.0
maximum: 257407.0
average: 864.2732006942867
Wall time: 9.350 s
RSS Δ: +0.02 MB
Peak memory Δ: +0.00 MB (OS-dependent)


                                                                                

<ExecutionResult object at 107f4cd50, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 107f4ebd0, raw_cell="# codecell_52hg6x (keep this id for tracking purpo.." transformed_cell="# codecell_52hg6x (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

Wait, something's not right! 
The average price is 864.27 but the maximum seems suss...
It is possible these high prices are just errors.

For simplicity, let us assume a threshold value equal to 100x the average, and remove anything more than that.
Filter `events_clean` as described.

write some code here

In [26]:
%%timemem
# codecell_52bf5d (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

# 1. Calcul du seuil limite
threshold = 100 * average

# 2. Application du filtre
# On garde : (Prix <= seuil) OU (Prix est NULL)
events_clean = events_clean.filter(
    (F.col("price") <= threshold) | 
    (F.col("price").isNull())
)

print(f"Seuil appliqué : {threshold:.2f}")

# By the time we get to here, "events_clean" should conform to the specification above.

events_clean.count()

Seuil appliqué : 86427.32


                                                                                

42351862

Wall time: 9.156 s
RSS Δ: +0.02 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 108034310, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 108035a90, raw_cell="# codecell_52bf5d (keep this id for tracking purpo.." transformed_cell="# codecell_52bf5d (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=42351862>

Good, we still have about 42.4M records, but we've done some basic data cleaning.
Let us continue...

### 5.3 Build Tiny Lookup Tables (LKPs)

Create lookup tables that help us connect `events_clean` with the dimension tables we created:

- `user_lkp`: (`user_id` → `user_key`) from `dim_user`.
- `prod_lkp`: (`product_id` → `product_key`, `brand_key`, `category_key`) from `dim_product`.
- `date_lkp`: (`date` → `date_key`) from `dim_date`.
- session-to-user bridge: use the raw `df_session` (`session_id`, `user_id`) CSV (not a dimension) to pull `user_id`.

**Hint:** These LKPs are just calling `select` from the right sources with the right parameters.

**write some code here**

In [27]:
%%timemem
# codecell_53l2kp (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

# 1. User Lookup : lien entre user_id (naturel) et user_key (surrogate)
user_lkp = dim_user.select("user_id", "user_key")

# 2. Product Lookup : lien entre product_id et toutes les clés dimensionnelles associées
prod_lkp = dim_product.select("product_id", "product_key", "brand_key", "category_key")

# 3. Date Lookup : lien entre la date calendaire et le date_key (YYYYMMDD)
date_lkp = dim_date.select("date", "date_key")

# 4. Session Bridge : lien entre session_id et user_id
# On utilise df_session (la table raw) et on s'assure qu'elle est unique
session_bridge = df_session.select("session_id", "user_id").distinct()

# By the time we get to here, the following variables should conform to the specification above.

print(session_bridge.count(), user_lkp.count(), prod_lkp.count(), date_lkp.count())

                                                                                

9244421 3022290 166794 32
Wall time: 5.284 s
RSS Δ: -3.52 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 107fe4c50, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 107fe62d0, raw_cell="# codecell_53l2kp (keep this id for tracking purpo.." transformed_cell="# codecell_53l2kp (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

### 5.4 Join Everything Together

Finally, join everything together to create `fact_events`.
Follow the following steps:

- Start from `clean events` with these columns: (`event_time`, `event_type`, `session_id`, `product_id`, `price`, `date`).
- Join sessions first (to get `user_id`).
- Then join product, date, and user.
- Join with `dim_user` to find out the birthdate and compute user age at the day of the event in `age_on_event`.
- Join with `dim_age` to find the age band based on `age_on_event`.

**Hints:**

- You built the LKPs for a reason... use them.
- Left, right, or natural joins?

The final part above is a bit tricky, so we'll just give you the answer. But you'll need to figure out how it integrates with everything above.

```
        .withColumn("age_on_event", F.floor(F.months_between(F.col("date"), F.to_date("birthdate"))/12))
        .join(
           dim_age.select("age_key", "age_band", "min_age", "max_age"),
           (
               ((F.col("age_on_event") > F.col("min_age"))) &
               ((F.col("age_on_event") <= F.col("max_age")))
           ),
           "left"
       )
```

The final result (`fact_events`) should include the following columns:

- `date_key`
- `user_key`
- `age_key`
- `product_key`
- `brand_key`
- `category_key`
- `session_id`
- `event_time`
- `event_type`
- `price`

**write some code here**

In [28]:
%%timemem
# codecell_54aaaa (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.

# 1. Préparation de la base
# On ajoute une colonne 'date' (type Date) à events_clean pour pouvoir joindre avec dim_date
base_events = events_clean.withColumn("date", F.to_date("event_time"))

# 2. Chaînage des jointures
fact_events_temp = base_events \
    .join(session_bridge, on="session_id", how="left") \
    .join(prod_lkp, on="product_id", how="left") \
    .join(date_lkp, on="date", how="left") \
    .join(dim_user, on="user_id", how="left") # On joint dim_user (pas user_lkp) pour récupérer 'birthdate'

# 3. Calcul de l'âge et jointure avec dim_age
# (On utilise le snippet fourni dans l'énoncé)
fact_events_w_age = fact_events_temp \
    .withColumn("age_on_event", F.floor(F.months_between(F.col("date"), F.to_date("birthdate"))/12)) \
    .join(
        dim_age.select("age_key", "age_band", "min_age", "max_age"),
        (
            ((F.col("age_on_event") > F.col("min_age"))) &
            ((F.col("age_on_event") <= F.col("max_age")))
        ),
        "left"
    )

# 4. Sélection finale des colonnes (mise en forme)
fact_events = fact_events_w_age.select(
    F.col("date_key"),
    F.col("user_key"),
    F.col("age_key"),
    F.col("product_key"),
    F.col("brand_key"),
    F.col("category_key"),
    F.col("session_id"),
    F.col("event_time"),
    F.col("event_type"),
    F.col("price")
)

# By the time we get to here, "fact_events" should conform to the specification above.

# Affichage pour contrôle
print(f"Lignes finales dans la table de faits : {fact_events.count()}")
fact_events.show(5)

25/12/29 16:01:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 1

Lignes finales dans la table de faits : 42351862


25/12/29 16:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 1

+--------+-----------+-------+-----------+---------+------------+--------------------+-------------------+----------+------+
|date_key|   user_key|age_key|product_key|brand_key|category_key|          session_id|         event_time|event_type| price|
+--------+-----------+-------+-----------+---------+------------+--------------------+-------------------+----------+------+
|20191031|42949886615|      4|      14079|     2555|           3|0b63f487-9146-4ec...|2019-10-31 14:05:12|      view| 38.35|
|20191018|17180086555|      1|      22906|     3244|           3|40d01ba8-f8cc-4a9...|2019-10-18 05:30:27|      view|  51.4|
|20191008|42949945300|      2|       1197|     NULL|           8|38609af5-1ece-422...|2019-10-08 20:03:41|      view|771.94|
|20191018|17180053625|   NULL|        223|     2702|           8|22f1098e-f80f-40f...|2019-10-18 05:30:28|      view| 344.9|
|20191002|     324788|      4|     149294|     NULL|        NULL|1a3583db-bbe0-40e...|2019-10-02 11:07:43|      view| 72.64|


<ExecutionResult object at 10801ab90, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 10801ad10, raw_cell="# codecell_54aaaa (keep this id for tracking purpo.." transformed_cell="# codecell_54aaaa (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>

Congrats, you've done it!
You've created the fact table successfuly! 🚀

Here is the summary of the schema:

- `date_key` (FK → `dim_date`)
- `user_key` (FK → `dim_user`)
- `age_key`  (FK → `dim_age`)
- `product_key` (FK → `dim_product`)
- `brand_key` (FK → `dim_brand`)
- `category_key` (FK → `dim_category`)
- `session_id` (STRING, business key, kept directly in this table)
- `event_time` (TIMESTAMP)
- `event_tpe` (STRING)
- `price` (DOUBLE)


## 6. Export the Fact Table

You now have a shiny `fact_events` table!
But how should you store it?
(Remember our discussion in class about row vs. column representations?)

Let's store `fact_events` in a few different ways and compare data sizes.

First, let's try writing out as CSV files, both compressed and uncompressed, per below.

Note that in Spark, we specify the output _directory_, which is then populated with many "part" files.

In [29]:
fact_events.write.mode("overwrite").option("header", True).csv(BASE_DIR + "/fact_events.csv")
fact_events.write.mode("overwrite").option("header", True).option("compression", "snappy").csv(BASE_DIR + "/fact_events.csv.snappy")

25/12/29 16:04:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:04:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:04:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:04:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:04:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:04:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 1

Let's then try Parquet:

In [30]:
fact_events.write.mode("overwrite").parquet(BASE_DIR + "/fact_events.parquet")

25/12/29 16:08:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:08:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:08:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:08:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:08:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 16:08:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/29 1

Let's compare the output sizes using the following bit of code:

In [31]:
import os
for f in [BASE_DIR + "/fact_events.csv", BASE_DIR + "/fact_events.csv.snappy", BASE_DIR + "/fact_events.parquet"]:
    try:
        size = sum(os.path.getsize(os.path.join(dp, fn))
                   for dp, dn, filenames in os.walk(f)
                   for fn in filenames)
        print(f"{f}: {size/(1024*1024*1024):.1f} GB")
    except FileNotFoundError:
        pass

/Users/lorenzo/Documents/Cours/E4FD/Data-Engineering/Lab2/lab2Assignment/fact_events.csv: 4.4 GB
/Users/lorenzo/Documents/Cours/E4FD/Data-Engineering/Lab2/lab2Assignment/fact_events.csv.snappy: 1.2 GB
/Users/lorenzo/Documents/Cours/E4FD/Data-Engineering/Lab2/lab2Assignment/fact_events.parquet: 1.0 GB


**your answers below!**

// qcell_6a9876 (keep this id for tracking purposes)

- **Size of CSV output, no compression:** 4.4  GB
- **Size of CSV output, Snappy compression:** 1.2 GB
- **Size of Parquet output:** 1.0 GB 

**Answer the following question:**

Q6.1 Why is columnar storage (Parquet) usually much smaller?

Q6.2 Which format is better for analytical queries and why?

**your answers below!**

// qcell_6b1234 (keep this id for tracking purposes)

**Q6.1 Answer:**

Le stockage colonnaire est beaucoup plus compact car il regroupe les données par colonne, créant des blocs de valeurs homogènes. Contrairement au stockage par ligne où les types varient constamment, cette avantage permet aux algorithmes de compression (comme Snappy ou RLE) de réduire l'espace disque, par exemple en ne stockant qu'une seule fois une valeur qui se répète 1000 fois de suite.

**Q6.2 Answer:**

Le format Parquet est le meilleur pour l'analytique car il minimise les lectures disques (I/O) grâce à la "projection de colonnes", qui permet de ne charger en mémoire que les colonnes utiles à la requête. Cela permet aux métadonnées d'ignorer des blocs entiers de fichiers ne correspondant pas aux filtres. Cela permet à Spark de traiter des agrégations sur des milliards de lignes très rapidement en évitant de lire des téraoctets de données inutiles, ce qui est impossible avec un format ligne comme le CSV.

## 7. Submission

Details about the Submission of this assignment are outlined in the helper.

In [32]:
%%timemem
spark.stop()

Wall time: 4.429 s
RSS Δ: +1.92 MB
Peak memory Δ: +0.00 MB (OS-dependent)


<ExecutionResult object at 107fe4550, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 107fe4350, raw_cell="spark.stop()
" transformed_cell="spark.stop()
" store_history=False silent=False shell_futures=True cell_id=None> result=None>

## Deliverables
- This notebook with all code cells executed.
- A brief `REPORT.md` with: inputs, assumptions, plan screenshots, quality results, and performance choices.
- Output folder with Parquet sample (≤20 MB).

## Evaluation
- Correctness and clarity of pipeline (40%).
- Data‑quality gates and rationale (20%).
- Performance reasoning and plan analysis (20%).
- Reproducibility and organization (20%).


## Performance notes
- Record `spark.sql.shuffle.partitions` and justify your value.
- Show one example of avoiding UDFs by using built‑ins.
- If you use broadcast join, explain why it is safe.


## Reproducibility checklist
- List Spark version and key configs.
- Fix time zone to UTC.
- Control randomness if used.
- Provide exact commands to run the notebook end‑to‑end.
