# Data Engineering with Spark

By Tom URBAN & Ethan SMADJA 

## Lab 3: Structured Streaming

### Prerequisites

- Connect to the [Databricks Community Edition](https://community.cloud.databricks.com/login.html)
- Upload the provided notebook

### Goals

- Stream the `events` datasets from files
- Use Spark Structured Streaming to define the streaming dataframes and process the stream
- Visualize how the aggregation results change while new data is coming in
- Compare the code for dataframe analysis in batch and streaming mode

### Lab resources

- Notebook
- The data is part of the Databricks workspace: `/databricks-datasets/structured-streaming/events`

### Useful links

- [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

### TO DO

1. Explore the dataset in the batch mode
2. Do the streaming demo:
  - define the streaming dataframe
  - define the transformations
  - start the stream
  - observe the changes in the results
3. With the help of the code from the demo, implement streaming example on another dataset  


1. Explore the dataset in the batch mode

In [0]:
events_path = "/databricks-datasets/structured-streaming/events/"

# display the files
display(dbutils.fs.ls(events_path))



Reading in batch and Schema display

In [0]:
# reading in batch
events_batch = spark.read.json(events_path)

# Schema
events_batch.printSchema()

# small display
events_batch.show(5, truncate=False)


Aggregation 

In [0]:
# Lecture en batch : on lit tous les fichiers d’un coup
events_batch = spark.read.json(events_path)

# Transformation : nombre d’événements par action
events_by_action_batch = (
    events_batch
    .groupBy("action")
    .count()
    .orderBy("count", ascending=False)
)

# Affichage du résultat final (figé)
display(events_by_action_batch)


2. Démo streaming sur le dataset events
a. Définir le schéma à partir du batch

In [0]:
schema = events_batch.schema
schema

In [0]:
events_stream = (
    spark.readStream
        .schema(schema)
        .option("maxFilesPerTrigger", 1)  # lit un fichier à la fois pour observer les updates
        .json(events_path)
)


In [0]:
from pyspark.sql.functions import col

events_by_action_stream = (
    events_stream
    .groupBy("action")
    .count()
    .orderBy(col("count").desc())
)


In [0]:
spark.sql("""
CREATE VOLUME IF NOT EXISTS workspace.raw_events.events_tmp_25_11_14
COMMENT 'Temporary raw events volume for the streaming demo'
""")

spark.sql("""
CREATE VOLUME IF NOT EXISTS workspace.checkpoints.events_by_action_demo
COMMENT 'Checkpoint storage for the streaming demo'
""")


In [0]:
from datetime import datetime

# --- 1. Catalog et schemas ---
catalog = "workspace"                     # c’est ton vrai catalog
uc_schema_raw_events = "raw_events"       # schéma où tu veux stocker les fichiers sources
db_schema_checkpoints = "checkpoints"     # schéma où seront les checkpoints
stream_name = "events_by_action_demo"     # nom logique du flux

# --- 2. Création des schémas si besoin ---
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{uc_schema_raw_events}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db_schema_checkpoints}")

# --- 3. Volume temporaire pour les données du jour ---
raw_events_volume_time = datetime.now()
raw_events_volume = f"events_tmp_{raw_events_volume_time.strftime('%y_%m_%d')}"

spark.sql(f"""
CREATE VOLUME IF NOT EXISTS {catalog}.{uc_schema_raw_events}.{raw_events_volume}
COMMENT 'Temporary raw events volume for streaming demo'
""")

# --- 4. Définir les chemins complets ---
raw_data_path = f"/Volumes/{catalog}/{uc_schema_raw_events}/{raw_events_volume}"
checkpoint_path = f"/Volumes/{catalog}/{db_schema_checkpoints}/{stream_name}"

print("Raw data path :", raw_data_path)
print("Checkpoint path :", checkpoint_path)

# --- 5. Lecture du flux ---
events_stream = (
    spark.readStream
        .schema(schema)
        .option("maxFilesPerTrigger", 1)
        .json(events_path)
)

# --- 6. Transformation ---
events_by_action_stream = (
    events_stream
    .groupBy("action")
    .count()
)

# --- 7. Exécution du stream ---
display(
    events_by_action_stream,
    checkpointLocation=checkpoint_path
)
