# LAB 05: Streaming & Auto Loader

**Duration:** ~35 min | **Day:** 2 | **Difficulty:** Intermediate-Advanced
**After module:** M05: Incremental Data Processing

> *"Set up Auto Loader for streaming JSON ingestion into the Bronze layer with exactly-once guarantees."*

## Setup

In [None]:
%run ../../setup/00_setup

In [None]:
# Prepare landing zone and checkpoint paths
landing_path = f"{DATASET_PATH}/orders/stream"
checkpoint_path = f"/tmp/{CATALOG}/lab05/checkpoint"
schema_path = f"/tmp/{CATALOG}/lab05/schema"
target_table = f"{CATALOG}.{BRONZE_SCHEMA}.orders_stream"

# Clean up from previous runs
spark.sql(f"DROP TABLE IF EXISTS {target_table}")
dbutils.fs.rm(checkpoint_path, True)
dbutils.fs.rm(schema_path, True)

print(f"Landing path: {landing_path}")
print(f"Target table: {target_table}")
print(f"Files available: {[f.name for f in dbutils.fs.ls(landing_path)]}")

---
## Task 1: COPY INTO (Batch Ingestion)

Use `COPY INTO` to load the first file from the landing zone.

In [None]:
# First create the target table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {target_table}
    (order_id STRING, customer_id STRING, product_id STRING, 
     quantity INT, total_price DOUBLE, order_date STRING, 
     payment_method STRING, store_id STRING)
""")

# TODO: Use COPY INTO to load JSON files
spark.sql(f"""
    COPY INTO {target_table}
    FROM '{landing_path}'
    FILEFORMAT = ________
    FORMAT_OPTIONS ('mergeSchema' = 'true')
""")

count_after_copy = spark.table(target_table).count()
print(f"Rows after COPY INTO: {count_after_copy}")

In [None]:
# -- Validation --
assert count_after_copy > 0, "COPY INTO should have loaded data"
print(f"Task 1 OK: {count_after_copy} rows loaded via COPY INTO")

---
## Task 2: Verify COPY INTO Idempotency

Run COPY INTO again on the same files. How many new rows are loaded?

In [None]:
# TODO: Re-run the same COPY INTO
spark.sql(f"""
    COPY INTO {target_table}
    FROM '{landing_path}'
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('mergeSchema' = 'true')
""")

count_after_rerun = spark.table(target_table).count()
print(f"Rows after re-run: {count_after_rerun} (was {count_after_copy})")
print(f"New rows loaded: {count_after_rerun - count_after_copy}")

In [None]:
# -- Validation --
assert count_after_rerun == count_after_copy, "COPY INTO should be idempotent - no new rows!"
print("Task 2 OK: COPY INTO is idempotent. 0 new rows on re-run.")

---
## Task 3: Auto Loader - Configure Stream

Set up Auto Loader (`cloudFiles`) to read JSON files from the landing zone.

Key options:
- `cloudFiles.format` = json
- `cloudFiles.schemaLocation` = path for inferred schema
- `cloudFiles.inferColumnTypes` = true

In [None]:
# Reset target for Auto Loader test
al_target = f"{CATALOG}.{BRONZE_SCHEMA}.orders_autoloader"
spark.sql(f"DROP TABLE IF EXISTS {al_target}")
dbutils.fs.rm(checkpoint_path, True)
dbutils.fs.rm(schema_path, True)

In [None]:
# TODO: Configure Auto Loader readStream
df_stream = (
    spark.readStream
    .format(________)
    .option("cloudFiles.format", ________)
    .option("cloudFiles.schemaLocation", schema_path)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(landing_path)
)

In [None]:
# -- Validation --
assert df_stream.isStreaming, "Should be a streaming DataFrame"
print(f"Task 3 OK: Streaming DataFrame configured with schema: {df_stream.schema.fieldNames()}")

---
## Task 4: Write Stream with trigger(availableNow=True)

Write the stream to a Delta table using `trigger(availableNow=True)`.

This processes all available files and stops automatically.

In [None]:
# TODO: Write stream to Delta table
query = (
    df_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .trigger(________=________)
    .toTable(al_target)
)

query.awaitTermination()
print(f"Stream completed. Rows loaded: {spark.table(al_target).count()}")

In [None]:
# -- Validation --
al_count = spark.table(al_target).count()
assert al_count > 0, "Auto Loader should have loaded data"
print(f"Task 4 OK: {al_count} rows loaded via Auto Loader")

---
## Task 5: Incremental Processing

Re-run the stream. Since no new files arrived, 0 new rows should be processed.

This proves the checkpoint tracks processed files.

In [None]:
# Re-run the stream (same checkpoint = incremental)
df_stream2 = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", schema_path)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(landing_path)
)

query2 = (
    df_stream2
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable(al_target)
)

query2.awaitTermination()
al_count2 = spark.table(al_target).count()
print(f"Rows after re-run: {al_count2} (was {al_count})")
print(f"New rows: {al_count2 - al_count}")

In [None]:
# -- Validation --
assert al_count2 == al_count, f"Should be 0 new rows, but got {al_count2 - al_count}"
print("Task 5 OK: Incremental processing verified. 0 new rows on re-run (checkpoint works!)")

---
### Task 6: Metadata Columns

Dodaj kolumny metadanych do danych strumieniowych: `_processing_time` (timestamp przetworzenia) i `_source_file` (ścieżka pliku źródłowego z `_metadata`).

Te kolumny są kluczowe w produkcyjnych pipeline'ach do debugowania i audytu.

**TODO:** Uzupełnij `current_timestamp()` i `_metadata.file_path`.

In [None]:
from pyspark.sql.functions import current_timestamp, col, input_file_name

# Target table for metadata-enriched stream
metadata_target = f"{catalog}.{schema}.orders_with_metadata"
metadata_checkpoint = f"{base_path}/checkpoints/orders_metadata"

# TODO: Configure Auto Loader with metadata columns
df_with_metadata = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{base_path}/schema_metadata")
    .load(source_path)
    .withColumn("_processing_time", ________)       # hint: current_timestamp()
    .withColumn("_source_file", ________)            # hint: col("_metadata.file_path")
)

# Write enriched stream
(
    df_with_metadata
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", metadata_checkpoint)
    .trigger(availableNow=True)
    .toTable(metadata_target)
    .awaitTermination()
)

print(f"Written to: {metadata_target}")
spark.table(metadata_target).display()

In [None]:
# -- Validation --
meta_cols = spark.table(metadata_target).columns
assert "_processing_time" in meta_cols, "Missing '_processing_time' — did you use current_timestamp()?"
assert "_source_file" in meta_cols, "Missing '_source_file' — did you use _metadata.file_path?"
print(f"Task 6 OK: Metadata columns added — {meta_cols}")

---
### Task 7: Schema Evolution — Rescued Data

Skonfiguruj Auto Loader z częściowym schematem (tylko `order_id` + `customer_id`). Ustaw `schemaEvolutionMode` na `"rescue"`, aby dodatkowe kolumny trafiły do `_rescued_data`.

**TODO:** Uzupełnij `schemaEvolutionMode`.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

rescue_target = f"{catalog}.{schema}.orders_rescued"
rescue_checkpoint = f"{base_path}/checkpoints/orders_rescued"

# Partial schema — only 2 columns out of many
partial_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
])

# TODO: Configure Auto Loader with rescue mode
df_rescued = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{base_path}/schema_rescued")
    .option("cloudFiles.schemaEvolutionMode", "________")  # hint: "rescue"
    .schema(partial_schema)
    .load(source_path)
)

# Write with rescued data
(
    df_rescued
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", rescue_checkpoint)
    .trigger(availableNow=True)
    .toTable(rescue_target)
    .awaitTermination()
)

print(f"Written to: {rescue_target}")
spark.table(rescue_target).display()

In [None]:
# -- Validation --
rescue_cols = spark.table(rescue_target).columns
assert "_rescued_data" in rescue_cols, "Missing '_rescued_data' column — did you set schemaEvolutionMode to 'rescue'?"
rescue_count = spark.table(rescue_target).filter("_rescued_data IS NOT NULL").count()
assert rescue_count > 0, "Expected rescued data for columns not in partial schema"
print(f"Task 7 OK: {rescue_count} rows with rescued data (extra columns captured in _rescued_data)")

---
### Task 8: Stream-Static Join

Połącz dane strumieniowe (zamówienia) z **tabelą statyczną** (klienci), aby wzbogacić strumień o informacje o kliencie.

**Stream-Static Join** pozwala łączyć streaming DataFrame z batch DataFrame — strona statyczna jest odczytywana na nowo w każdym micro-batchu.

**TODO:** Uzupełnij `readStream`, kolumny joina i `writeStream`.

In [None]:
# -- Stream-Static Join: Enrich orders with customer info --

join_target = f"{catalog}.{schema}.orders_enriched"
join_checkpoint = f"{base_path}/checkpoints/orders_enriched"

# Static side: read customers table
customers_df = spark.table(f"{catalog}.{schema}.customers")

# Stream side: read orders as a stream
# TODO: Fill in the format and table name
orders_stream = (
    spark
    .readStream
    .format("________")        # hint: "delta"
    .table(________)           # hint: target_table (the table from Task 1)
)

# TODO: Join stream with static DataFrame on customer_id
enriched_stream = orders_stream.join(
    ________,                  # hint: customers_df
    on="________",             # hint: "customer_id"
    how="left"
)

# TODO: Write the joined stream to join_target
(
    enriched_stream
    .writeStream
    .format("delta")
    .outputMode("________")          # hint: "append"
    .option("checkpointLocation", join_checkpoint)
    .trigger(availableNow=________)  # hint: True
    .toTable(________)               # hint: join_target
    .awaitTermination()
)

print(f"Stream-static join written to: {join_target}")
spark.table(join_target).display()

In [None]:
# -- Validation --
enriched_count = spark.table(join_target).count()
enriched_cols = spark.table(join_target).columns
assert enriched_count > 0, "No rows in enriched table"
assert "customer_id" in enriched_cols, "Missing customer_id"
# Check that join added customer columns
assert any(c for c in enriched_cols if c not in spark.table(target_table).columns), \
    "Join didn't add any new columns — check join expression"
print(f"Task 8 OK: {enriched_count} enriched rows. Columns: {enriched_cols}")

---
## Task 6: Add Metadata Columns

Enrich the stream with processing metadata:
- `_processing_time` — timestamp of when the record was processed
- `_source_file` — path of the source file (from `_metadata.file_path`)

> **Exam Tip:** `_metadata` is a hidden column available in Auto Loader streams. It contains `file_path`, `file_name`, `file_size`, `file_modification_time`.

In [None]:
# Stop any active streams
for s in spark.streams.active:
    s.stop()

spark.sql(f"DROP TABLE IF EXISTS {target_table}")
spark.sql(f"DROP TABLE IF EXISTS {al_target}")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.orders_with_metadata")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.orders_rescued")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.orders_enriched")
dbutils.fs.rm(checkpoint_path, True)
dbutils.fs.rm(schema_path, True)
dbutils.fs.rm(f"{base_path}/checkpoints/orders_metadata", True)
dbutils.fs.rm(f"{base_path}/checkpoints/orders_rescued", True)
dbutils.fs.rm(f"{base_path}/checkpoints/orders_enriched", True)
print("Lab cleanup complete")

In [None]:
# -- Validation --
meta_cols = spark.table(meta_target).columns
assert "_processing_time" in meta_cols, "Missing '_processing_time' column"
assert "_source_file" in meta_cols, "Missing '_source_file' column"
print(f"Task 6 OK: {spark.table(meta_target).count()} rows with metadata columns")

---
## Task 7: Schema Evolution — Rescued Data

Configure Auto Loader with `schemaEvolutionMode = "rescue"` and a **partial schema** (only `order_id` and `customer_id`). All other columns should land in the `_rescued_data` column.

> **Exam Tip:** `_rescued_data` captures new columns, type mismatches, and unexpected fields. Key options: `addNewColumns`, `rescue`, `failOnNewColumns`.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

# Reset
rescue_target = f"{CATALOG}.{BRONZE_SCHEMA}.orders_rescued"
rescue_checkpoint = f"/tmp/{CATALOG}/lab05/checkpoint_rescue"
rescue_schema = f"/tmp/{CATALOG}/lab05/schema_rescue"

spark.sql(f"DROP TABLE IF EXISTS {rescue_target}")
dbutils.fs.rm(rescue_checkpoint, True)
dbutils.fs.rm(rescue_schema, True)

# Define partial schema (intentionally incomplete)
partial_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
])

# TODO: Configure Auto Loader with rescue mode
df_rescue = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", rescue_schema)
    .option("cloudFiles.schemaEvolutionMode", ________)  # "rescue"
    .schema(partial_schema)
    .load(landing_path)
)

query_rescue = (
    df_rescue.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", rescue_checkpoint)
    .trigger(availableNow=True)
    .toTable(rescue_target)
)
query_rescue.awaitTermination()

# Check the rescued data column
display(spark.table(rescue_target).limit(5))
spark.table(rescue_target).printSchema()

In [None]:
# -- Validation --
rescue_cols = spark.table(rescue_target).columns
assert "_rescued_data" in rescue_cols, "Missing '_rescued_data' column — did you set schemaEvolutionMode to 'rescue'?"
rescue_count = spark.table(rescue_target).filter("_rescued_data IS NOT NULL").count()
assert rescue_count > 0, "Expected rescued data for columns not in partial schema"
print(f"Task 7 OK: {rescue_count} rows with rescued data (extra columns captured in _rescued_data)")

### Task 8: Stream-Static Join

Połącz dane strumieniowe (zamówienia) z **tabelą statyczną** (klienci), aby wzbogacić strumień o informacje o kliencie.

**Stream-Static Join** pozwala łączyć streaming DataFrame z batch DataFrame — strona statyczna jest odczytywana na nowo w każdym micro-batchu.

**TODO:** Uzupełnij `readStream`, kolumny joina i `writeStream`.

In [None]:
# -- Stream-Static Join: Enrich orders with customer info --

join_target = f"{catalog}.{schema}.orders_enriched"
join_checkpoint = f"{base_path}/checkpoints/orders_enriched"

# Static side: read customers table
customers_df = spark.table(f"{catalog}.{schema}.customers")

# Stream side: read orders as a stream
# TODO: Fill in the format and table name
orders_stream = (
    spark
    .readStream
    .format("________")        # hint: "delta"
    .table(________)           # hint: target_table (the table from Task 1)
)

# TODO: Join stream with static DataFrame on customer_id
enriched_stream = orders_stream.join(
    ________,                  # hint: customers_df
    on="________",             # hint: "customer_id"
    how="left"
)

# TODO: Write the joined stream to join_target
(
    enriched_stream
    .writeStream
    .format("delta")
    .outputMode("________")          # hint: "append"
    .option("checkpointLocation", join_checkpoint)
    .trigger(availableNow=________)  # hint: True
    .toTable(________)               # hint: join_target
    .awaitTermination()
)

print(f"Stream-static join written to: {join_target}")
spark.table(join_target).display()

In [None]:
# -- Validation --
enriched_count = spark.table(join_target).count()
enriched_cols = spark.table(join_target).columns
assert enriched_count > 0, "No rows in enriched table"
assert "customer_id" in enriched_cols, "Missing customer_id"
# Check that join added customer columns
assert any(c for c in enriched_cols if c not in spark.table(target_table).columns), \
    "Join didn't add any new columns — check join expression"
print(f"Task 8 OK: {enriched_count} enriched rows. Columns: {enriched_cols}")

---
## Cleanup

In [None]:
# Stop any active streams
for s in spark.streams.active:
    s.stop()

spark.sql(f"DROP TABLE IF EXISTS {target_table}")
spark.sql(f"DROP TABLE IF EXISTS {al_target}")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.orders_with_metadata")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.orders_rescued")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.orders_enriched")
dbutils.fs.rm(checkpoint_path, True)
dbutils.fs.rm(schema_path, True)
dbutils.fs.rm(f"{base_path}/checkpoints/orders_metadata", True)
dbutils.fs.rm(f"{base_path}/checkpoints/orders_rescued", True)
dbutils.fs.rm(f"{base_path}/checkpoints/orders_enriched", True)
print("Lab cleanup complete")

---
## Lab Complete!

You have:
- Used COPY INTO for idempotent batch loading
- Configured Auto Loader (cloudFiles) for streaming ingestion
- Used trigger(availableNow=True) for incremental processing
- Verified checkpoint-based exactly-once guarantees
- Added metadata columns (`_processing_time`, `_source_file`) to streaming writes
- Used rescued data column for schema evolution handling
- Performed a stream-static join to enrich streaming data

> **Exam Tip:** Auto Loader uses `cloudFiles` format. COPY INTO is simpler but Auto Loader scales better (file notification mode for millions of files). Both are idempotent.

| Feature | COPY INTO | Auto Loader |
|---------|-----------|-------------|
| Format | SQL command | readStream/writeStream |
| Scalability | Thousands of files | Millions of files |
| Schema evolution | Manual | Automatic (rescue column) |
| File tracking | SQL state | Checkpoint directory |

> **Next:** LAB 06 - Advanced Transforms 