# Streaming & Incremental Data

## Scenario
We will simulate a real-time data ingestion pipeline using Auto Loader and Delta Lake.
You will build a streaming pipeline that ingests JSON files, handles schema evolution, and performs real-time aggregations.

**Task:**
1. Configure Auto Loader to ingest files.
2. Handle schema evolution with "Rescue Mode".
3. Perform windowed aggregations with watermarking.
4. Join streaming data with static tables.

**Time:** 30 minutes

## User Isolation
This notebook is designed to be run in a shared environment.
To avoid conflicts, we will use a unique `catalog` and `schema` for your user.
The `00_setup` script will automatically configure these for you.

In [0]:
%run ../00_setup

## Environment Configuration
We will configure the environment variables and paths used in this workshop.

In [0]:
import time
from pyspark.sql.functions import col, current_timestamp, window, expr

# --- CONFIGURATION ---
# We will use a temporary directory to simulate data arrival
STREAM_SOURCE_PATH = f"{DATASET_BASE_PATH}/workshop/stream/orders"
SIMULATION_PATH = f"{DATASET_BASE_PATH}/workshop/simulation/incoming_orders"
CHECKPOINT_PATH = f"{DATASET_BASE_PATH}/workshop/simulation/checkpoints"
SCHEMA_PATH = f"{DATASET_BASE_PATH}/workshop/simulation/schemas"

# Clean up previous runs
dbutils.fs.rm(SIMULATION_PATH, True)
dbutils.fs.rm(CHECKPOINT_PATH, True)
dbutils.fs.rm(SCHEMA_PATH, True)
dbutils.fs.mkdirs(SIMULATION_PATH)

print(f"Source Data: {STREAM_SOURCE_PATH}")
print(f"Simulation Path: {SIMULATION_PATH}")

## Data Simulation Setup

In a real scenario, files would land in S3/ADLS automatically.
For this workshop, we will manually "drop" files into our simulation folder to trigger processing.

Let's check our source files.

In [0]:
STREAM_BASIC_FILE = "/Volumes/ecommerce_platform_trainer/default/datasets/workshop/SalesOrderDetail.csv"

In [0]:
# Read the source CSV file as a Spark DataFrame
df = spark.read.csv(STREAM_BASIC_FILE, header=True, inferSchema=True)

# Randomly split into 20 roughly equal batches
splits = df.randomSplit([0.05]*50, seed=42)


In [0]:
# Helper function to simulate data arrival
def arrive_batch(batch_id):
    filename = f"orders_stream_{batch_id:03d}.json"
    target = f"{SIMULATION_PATH}/{filename}"
    try:
        splits[batch_id].coalesce(1).write.mode("overwrite").json(target)
        print(f"✅ Batch {batch_id} arrived: {filename}")
    except Exception as e:
        print(f"❌ Error writing batch {batch_id}: {e}")

# Problem: If the source file does not exist or there are permission issues, dbutils.fs.cp will raise an exception.

# Simulate arrival of the first batch
arrive_batch(1)

## Auto Loader (cloudFiles)

**Auto Loader** is the standard way to ingest files in Databricks. It automatically detects new files and tracks state.

### Configure Auto Loader

Create a streaming DataFrame `df_stream` that reads from `SIMULATION_PATH`.

**Requirements:**
- Format: `cloudFiles`
- File Format: `json`
- Schema Location: `SCHEMA_PATH`
- Infer Column Types: `true`

**Hint:**
```python
spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", path) \
    .load(path)
```

In [0]:
# TODO: Configure Auto Loader
df_stream = (
    spark.readStream
    .format("___") # Use Auto Loader format (cloudFiles)
    .option("cloudFiles.format", "___") # Source file format
    .option("cloudFiles.schemaLocation", SCHEMA_PATH)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(___) # Path to data
)

### Write Stream to Delta Table

Write the stream to a Delta table named `orders_bronze`.
Use `trigger(availableNow=True)` to process the current batch and stop (good for testing).

**Hint:**
```python
df.writeStream \
    .format("delta") \
    .option("checkpointLocation", path) \
    .trigger(availableNow=True) \
    .toTable("catalog.schema.table")
```

In [0]:
table_name = f"{CATALOG}.{BRONZE_SCHEMA}.orders_bronze"
checkpoint_dir = f"{CHECKPOINT_PATH}/orders_bronze"

# TODO: Write stream
query = (
    df_stream.writeStream
    .format("___") # Target format
    .option("checkpointLocation", ___)
    .trigger(availableNow=True)
    .toTable(table_name)
)

query.awaitTermination()
print(f"Processed batch 1 into {table_name}")

In [0]:
# Verify data
display(spark.table(table_name))

## Schema Evolution & Rescue Data

Data changes. New columns appear. Types change.
Auto Loader handles this with **Schema Evolution** and **Rescued Data**.

### Simulate "Bad" Data

We will inject a file with:
1.  A new column (`customer_mood`).
2.  A malformed record (string in an integer field).

In [0]:
# Create a file with unexpected data
bad_data = [
    {"order_id": 9001, "total_amount": 50.0, "customer_mood": "happy"}, # New column
    {"order_id": 9002, "total_amount": "INVALID", "customer_mood": "angry"} # Type mismatch
]
spark.createDataFrame(bad_data).write.mode("overwrite").json(f"{SIMULATION_PATH}/bad_data_batch")

print("⚠️ Bad data batch arrived!")

### Configure Rescue Mode

Re-configure the stream to use `cloudFiles.schemaEvolutionMode` = `rescue`.
This will put unexpected data into a `_rescued_data` column instead of failing.

In [0]:
# TODO: Re-configure stream with rescue mode
df_stream_rescue = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", SCHEMA_PATH)
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaEvolutionMode", "___") # Enable Rescue Mode
    .load(SIMULATION_PATH)
)

# Write again
query_rescue = (
    df_stream_rescue.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_dir)
    .option("mergeSchema", "___") # Allow Delta table schema to evolve
    .trigger(availableNow=True)
    .toTable(table_name)
)

query_rescue.awaitTermination()

In [0]:
# Check the table - look for _rescued_data column
display(spark.table(table_name).filter("_rescued_data IS NOT NULL"))

## Real-Time Aggregations (Watermarking)

We want to count orders every 30 seconds.
Since streams are infinite, we need **Watermarking** to tell Spark when to close a window and discard old state.

### Define Windowed Aggregation

1.  Add a timestamp column `processing_time`.
2.  Define a watermark (e.g., "1 minute").
3.  Group by `window("processing_time", "30 seconds")`.

In [0]:
# Prepare stream with timestamp
df_with_time = df_stream_rescue.withColumn("processing_time", current_timestamp())

# TODO: Define aggregation
orders_count = (
    df_with_time
    .withWatermark("___", "___") # Column, threshold (e.g. "1 minute")
    .groupBy(
        window("___", "___") # Column, window duration (e.g. "30 seconds")
    )
    .count()
)

In [0]:
# Start aggregation stream (Output Mode: Update)
# We use processingTime trigger for continuous processing
query_agg = (
    orders_count.writeStream
    .format("memory") # Writing to memory for easy visualization
    .queryName("orders_dashboard")
    .outputMode("update")
    .trigger(processingTime="5 seconds")
    .start()
)

print("Dashboard stream started...")

In [0]:
# Simulate more data arrival to see updates
for i in range(2, 5):
    arrive_batch(i)
    time.sleep(5)

In [0]:
# View the dashboard
display(spark.sql("SELECT * FROM orders_dashboard ORDER BY window DESC"))

In [0]:
query_agg.stop()

## Stream-Static Join

Enrich the incoming orders with Customer data (which is a static Delta table).

**Note:** Stream-Static joins are stateless on the static side (Spark reads the static table at the start of each micro-batch).

In [0]:
# Load static customers table
customers_table = f"{CATALOG}.{SILVER_SCHEMA}.customers_silver"
df_customers = spark.read.table(customers_table)

# TODO: Join Stream (df_stream_rescue) with Static (df_customers)
# Join on customer_id
df_enriched = (
    df_stream_rescue.alias("o")
    .join(
        df_customers.alias("c"),
        col("___") == col("___"), # Join condition
        "___" # Join type (left)
    )
    .select("o.order_id", "o.total_amount", "c.FullName", "c.EmailAddress")
)

In [0]:
# Write enriched stream
query_join = (
    df_enriched.writeStream
    .format("delta")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/enriched_orders")
    .trigger(availableNow=True)
    .toTable(f"{CATALOG}.{BRONZE_SCHEMA}.orders_enriched")
)

query_join.awaitTermination()

display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.orders_enriched"))

## Cleanup

Stop all streams and remove temporary files.

In [0]:
# Stop any active streams
for q in spark.streams.active:
    q.stop()

# Optional: Remove simulation files
# dbutils.fs.rm(SIMULATION_PATH, True)
# dbutils.fs.rm(CHECKPOINT_PATH, True)

# Solution

The complete code is below.

In [0]:
# ============================================================
# FULL SOLUTION - Workshop 4: Streaming
# ============================================================


df_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", SCHEMA_PATH)
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(SIMULATION_PATH)
)

# --- Task 2.2: Write Stream ---
query = (
    df_stream.writeStream
    .format("delta")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/orders_bronze")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable(f"{CATALOG}.{BRONZE_SCHEMA}.orders_bronze")
)

In [0]:
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.orders_bronze"))

In [0]:
# --- Task 4.1: Aggregation ---
df_with_time = df_stream.withColumn("processing_time", current_timestamp())

orders_count = (
    df_with_time
    .withWatermark("processing_time", "1 minute")
    .groupBy(window("processing_time", "30 seconds"))
    .count()
)

# --- Task 5: Stream-Static Join ---
df_customers = spark.read.table(f"{CATALOG}.{SILVER_SCHEMA}.customers_silver")

df_enriched = (
    df_stream.alias("o")
    .join(df_customers.alias("c"), col("o.customer_id") == col("c.CustomerID"), "left")
    .select("o.order_id", "o.total_amount", "c.FullName")
)