# 6.3 Streaming Tables and Incremental Processing

This notebook explores real-time data processing with Lakeflow's `@dp.streaming_table` decorator. We'll learn how to build robust streaming pipelines with checkpointing, exactly-once semantics, watermarking, and incremental processing patterns using functional programming principles.

## Learning Objectives

By the end of this notebook, you will understand how to:
- Define streaming tables with `@dp.streaming_table` for real-time processing
- Implement checkpointing for fault tolerance and exactly-once guarantees
- Handle late-arriving data with watermarking strategies
- Design incremental processing patterns for efficient data pipelines
- Perform stream-to-stream and stream-to-batch joins
- Optimize streaming workload performance
- Apply functional programming to streaming transformations
- Test streaming pipelines with synthetic data

## Prerequisites

- Completion of Notebooks 6.1 and 6.2
- Understanding of Spark Structured Streaming concepts
- Knowledge of Delta Lake and change data capture
- Familiarity with event-driven architectures

In [None]:
# Platform setup detection
# In Databricks: Keep commented
# In Local: Uncomment this line
# %run 00_Environment_Setup.ipynb

In [None]:
# Essential imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.streaming import StreamingQuery
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

# In a real Lakeflow pipeline:
# from pyspark import pipelines as dp

print("✅ Imports complete - Ready for streaming table demonstration!")

## 1. Streaming Tables vs Batch Tables

### Fundamental Differences

**Batch Table (`@dp.table`)**:
```python
@dp.table
def customers():
    """Batch processing: Complete snapshot each run"""
    return spark.read.table("raw.customers")
    # Processes entire dataset on each pipeline execution
```

**Streaming Table (`@dp.streaming_table`)**:
```python
@dp.streaming_table
def events():
    """Streaming: Continuous incremental processing"""
    return spark.readStream.table("raw.events")
    # Only processes new data since last checkpoint
```

### Decision Matrix

| Criteria | Batch Table | Streaming Table |
|----------|-------------|------------------|
| **Data Pattern** | Complete snapshots | Continuous stream |
| **Processing** | Full table each run | Only new records |
| **Latency** | Minutes to hours | Seconds to minutes |
| **State Management** | None | Checkpoints maintained |
| **Fault Tolerance** | Re-run from start | Resume from checkpoint |
| **Cost** | Higher for large tables | Lower (incremental) |
| **Use Case** | Dimensions, aggregates | Events, logs, CDC |
| **Update Frequency** | Scheduled (hourly/daily) | Continuous/micro-batch |

### When to Use Streaming Tables

**✅ Use streaming tables for:**
- Event logs (clickstreams, application logs, IoT sensors)
- Change data capture (CDC) from databases
- Real-time analytics and dashboards
- Message queue data (Kafka, Event Hubs, Kinesis)
- Monitoring and alerting systems
- Time-series data with continuous ingestion

**❌ Don't use streaming tables for:**
- Dimension tables with full snapshots
- Data that arrives in large batches
- One-time historical loads
- Tables requiring complex aggregations without time windows
- Data sources without natural append ordering

## 2. Defining Streaming Tables

### Basic Streaming Table Definition

```python
from pyspark import pipelines as dp

@dp.streaming_table(
    name="bronze_events",
    comment="Real-time event stream from application logs"
)
def bronze_events():
    """
    Pure function returning a streaming DataFrame.
    Lakeflow manages checkpointing and state automatically.
    """
    return (
        spark.readStream
        .format("delta")
        .table("raw.events")
    )
```

### Reading from Various Streaming Sources

```python
# Source 1: Delta table (Auto Loader pattern)
@dp.streaming_table
def events_from_delta():
    return spark.readStream.format("delta").table("raw.events")

# Source 2: Cloud Files (Auto Loader for S3, ADLS, GCS)
@dp.streaming_table
def events_from_cloud_files():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/mnt/schemas/events")
        .load("/mnt/landing/events/")
    )

# Source 3: Kafka topic
@dp.streaming_table
def events_from_kafka():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "broker:9092")
        .option("subscribe", "events-topic")
        .option("startingOffsets", "latest")
        .load()
        .select(
            F.col("key").cast("string"),
            F.from_json(F.col("value").cast("string"), event_schema).alias("data")
        )
        .select("key", "data.*")
    )

# Source 4: Event Hub (Azure)
@dp.streaming_table
def events_from_eventhub():
    connection_string = dbutils.secrets.get("eventhub", "connection-string")
    return (
        spark.readStream
        .format("eventhubs")
        .options(**{
            "eventhubs.connectionString": connection_string,
            "eventhubs.consumerGroup": "lakeflow"
        })
        .load()
    )
```

### Streaming Table with Transformations

```python
# Pure transformation functions (testable)
def parse_event_timestamp(df: DataFrame) -> DataFrame:
    """Pure function: Parse timestamp from string"""
    return df.withColumn(
        "event_timestamp",
        F.to_timestamp("timestamp_str", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    )

def enrich_with_metadata(df: DataFrame) -> DataFrame:
    """Pure function: Add processing metadata"""
    return df.withColumn("processed_at", F.current_timestamp())

def extract_user_agent_info(df: DataFrame) -> DataFrame:
    """Pure function: Parse user agent string"""
    return df.withColumn(
        "device_type",
        F.when(F.col("user_agent").like("%Mobile%"), "mobile")
         .when(F.col("user_agent").like("%Tablet%"), "tablet")
         .otherwise("desktop")
    )

# Compose pure functions in streaming table
@dp.streaming_table(
    name="bronze_events_enriched",
    comment="Streaming events with parsed timestamps and metadata"
)
def bronze_events_enriched():
    """Streaming table with functional transformation composition"""
    return (
        spark.readStream.table("raw.events")
        .transform(parse_event_timestamp)
        .transform(enrich_with_metadata)
        .transform(extract_user_agent_info)
        .select(
            "event_id",
            "event_type",
            "event_timestamp",
            "user_id",
            "device_type",
            "processed_at"
        )
    )
```

## 3. Checkpointing and Exactly-Once Semantics

### What is Checkpointing?

Checkpointing is Spark's mechanism for maintaining **streaming state** and enabling **exactly-once processing**.

**Checkpoint Information Stored:**
- Offsets read from source (which records have been processed)
- Processing metadata (batch IDs, timestamps)
- Aggregation state (for stateful operations)
- Configuration and schema information

### How Lakeflow Manages Checkpoints

```python
# In Lakeflow, checkpointing is AUTOMATIC
@dp.streaming_table
def my_stream():
    return spark.readStream.table("source")
    # Lakeflow automatically:
    # 1. Creates checkpoint directory
    # 2. Manages checkpoint lifecycle
    # 3. Ensures exactly-once processing
    # 4. Handles recovery from failures
```

**In traditional Spark Structured Streaming** (for comparison):
```python
# Manual checkpoint management (NOT needed in Lakeflow)
query = (
    spark.readStream.table("source")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "/mnt/checkpoints/my_stream")  # Manual
    .table("target")
)
```

### Exactly-Once Processing Guarantees

Lakeflow streaming tables provide **exactly-once semantics**:

```python
@dp.streaming_table
def transaction_stream():
    """
    Each transaction is processed exactly once:
    - No duplicates (not at-least-once)
    - No missing records (not at-most-once)
    - Exactly once, even with failures and retries
    """
    return spark.readStream.table("raw.transactions")
```

**How Exactly-Once Works:**
1. Source offsets tracked in checkpoint
2. Transformation applied idempotently
3. Output written with transactional guarantees (Delta Lake)
4. Checkpoint updated only after successful write
5. On failure, replay from last successful checkpoint

### Checkpoint Lifecycle and Management

```python
# Lakeflow manages checkpoint lifecycle automatically
@dp.streaming_table(
    name="events",
    comment="Checkpoint managed by Lakeflow"
)
def events():
    return spark.readStream.table("raw.events")

# Checkpoint location: Automatically determined by Lakeflow
# Format: /pipelines/<pipeline_id>/checkpoints/<table_name>/
# Lifecycle: 
#   - Created on first run
#   - Maintained across pipeline runs
#   - Cleaned up when table is dropped
```

### Schema Evolution with Checkpoints

```python
# Schema changes require checkpoint reset
@dp.streaming_table
def events_with_schema():
    """
    If source schema changes:
    1. Lakeflow detects schema mismatch
    2. Pipeline fails with clear error message
    3. Options:
       a) Reset checkpoint (reprocess all data)
       b) Add schema evolution logic
    """
    return (
        spark.readStream
        .option("mergeSchema", "true")  # Enable schema evolution
        .table("raw.events")
    )
```

## 4. Watermarking and Late Data Handling

### What is Watermarking?

**Watermarking** defines how long Spark should wait for late-arriving data in event-time processing.

**Problem**: Events may arrive out of order or delayed
```
Event time:  10:00  10:01  10:02  10:03  10:04
Arrival:     10:00  10:02  10:01  10:05  10:03  <- Out of order!
```

**Solution**: Watermark defines acceptable delay

### Watermarking in Streaming Tables

```python
@dp.streaming_table(
    name="events_with_watermark",
    comment="Events with 10-minute watermark for late data"
)
def events_with_watermark():
    """
    Watermark: "Allow events up to 10 minutes late"
    
    If current max event_time is 12:00:
    - Accept events with event_time >= 11:50
    - Drop events with event_time < 11:50 (too late)
    """
    return (
        spark.readStream.table("raw.events")
        .withWatermark("event_timestamp", "10 minutes")  # Watermark definition
    )
```

### Watermark Impact on Aggregations

```python
@dp.streaming_table
def hourly_event_counts():
    """
    Time-windowed aggregation with watermark.
    
    Without watermark:
    - State grows infinitely (all windows kept in memory)
    - Out-of-memory errors
    
    With watermark:
    - Old windows automatically evicted
    - Bounded state size
    - Late data handled gracefully
    """
    return (
        spark.readStream.table("raw.events")
        .withWatermark("event_timestamp", "1 hour")  # Wait 1 hour for late data
        .groupBy(
            F.window("event_timestamp", "1 hour"),  # Hourly windows
            "event_type"
        )
        .agg(
            F.count("*").alias("event_count"),
            F.countDistinct("user_id").alias("unique_users")
        )
        .select(
            F.col("window.start").alias("window_start"),
            F.col("window.end").alias("window_end"),
            "event_type",
            "event_count",
            "unique_users"
        )
    )
```

### Choosing Watermark Duration

**Factors to Consider:**
- Expected latency of data source
- Business tolerance for data completeness
- State size and memory constraints
- Processing time vs event time skew

**Guidelines:**
```python
# Short watermark (1-5 minutes)
# Use when: Near real-time data, low latency requirements
.withWatermark("timestamp", "2 minutes")

# Medium watermark (10-60 minutes)
# Use when: Typical streaming data with occasional delays
.withWatermark("timestamp", "30 minutes")

# Long watermark (hours)
# Use when: High latency sources, prioritize completeness
.withWatermark("timestamp", "4 hours")
```

### Monitoring Late Data

```python
@dp.streaming_table
def events_with_lateness_tracking():
    """
    Track how late events arrive for watermark tuning.
    """
    return (
        spark.readStream.table("raw.events")
        .withColumn(
            "arrival_timestamp",
            F.current_timestamp()
        )
        .withColumn(
            "lateness_seconds",
            F.unix_timestamp("arrival_timestamp") - F.unix_timestamp("event_timestamp")
        )
        .withWatermark("event_timestamp", "15 minutes")
    )
# Analyze lateness_seconds to optimize watermark duration
```

## 5. Stateful Operations in Streaming

### Stateless vs Stateful Transformations

**Stateless** (no memory of previous records):
```python
@dp.streaming_table
def stateless_transform():
    """Each record processed independently"""
    return (
        spark.readStream.table("events")
        .filter(F.col("amount") > 100)      # Stateless
        .withColumn("double", F.col("amount") * 2)  # Stateless
    )
# No state maintained between batches
```

**Stateful** (maintains state across records):
```python
@dp.streaming_table
def stateful_aggregation():
    """Maintains running aggregates"""
    return (
        spark.readStream.table("events")
        .withWatermark("timestamp", "1 hour")
        .groupBy("user_id")              # Stateful: maintains groups
        .agg(F.sum("amount"))            # Stateful: running sum
    )
# State grows with number of unique keys (user_ids)
```

### Common Stateful Operations

**1. Windowed Aggregations**
```python
@dp.streaming_table
def window_aggregations():
    return (
        spark.readStream.table("events")
        .withWatermark("event_time", "10 minutes")
        .groupBy(
            F.window("event_time", "5 minutes", "1 minute")  # Sliding window
        )
        .agg(F.sum("revenue").alias("total_revenue"))
    )
```

**2. Deduplication**
```python
@dp.streaming_table
def deduplicated_events():
    """
    Remove duplicate events based on event_id.
    Maintains state of all seen event_ids within watermark.
    """
    return (
        spark.readStream.table("raw.events")
        .withWatermark("event_time", "24 hours")
        .dropDuplicates(["event_id"])  # Stateful deduplication
    )
```

**3. Stream-Stream Joins**
```python
@dp.streaming_table
def joined_streams():
    """
    Join two streams with state management.
    Both streams buffered within watermark window.
    """
    clicks = (
        spark.readStream.table("clicks")
        .withWatermark("click_time", "10 minutes")
    )
    
    impressions = (
        spark.readStream.table("impressions")
        .withWatermark("impression_time", "10 minutes")
    )
    
    return (
        clicks.join(
            impressions,
            F.expr("""
                click_id = impression_id AND
                click_time >= impression_time AND
                click_time <= impression_time + interval 5 minutes
            """),
            "leftOuter"
        )
    )
```

### Managing State Size

**State Size Growth Patterns:**
```python
# ⚠️ UNBOUNDED: State grows forever
@dp.streaming_table
def unbounded_state():
    return (
        spark.readStream.table("events")
        .groupBy("user_id")  # No watermark!
        .agg(F.sum("amount"))  # Accumulates for all time
    )
# Problem: Will eventually run out of memory

# ✅ BOUNDED: State cleaned up by watermark
@dp.streaming_table
def bounded_state():
    return (
        spark.readStream.table("events")
        .withWatermark("event_time", "1 day")  # Watermark set
        .groupBy(
            F.window("event_time", "1 hour"),  # Windowed
            "user_id"
        )
        .agg(F.sum("amount"))
    )
# State for old windows automatically evicted
```

## 6. Stream-to-Batch Joins

### Enriching Streams with Dimension Tables

```python
# Dimension table (batch)
@dp.table
def customers():
    """Slowly changing dimension - batch table"""
    return spark.table("raw.customers")

# Stream enriched with dimension
@dp.streaming_table
def enriched_transactions():
    """
    Join streaming transactions with batch customer table.
    
    Pattern: Stream-to-batch join
    - Stream: transactions (real-time)
    - Batch: customers (updated hourly/daily)
    - Result: Enriched stream with customer details
    """
    transactions_stream = spark.readStream.table("raw.transactions")
    customers_batch = dp.read("customers")  # Batch table
    
    return (
        transactions_stream
        .join(
            customers_batch,
            "customer_id",
            "left"  # Left join to keep all transactions
        )
        .select(
            "transaction_id",
            "customer_id",
            customers_batch["customer_name"],
            customers_batch["customer_tier"],
            "amount",
            "transaction_time"
        )
    )
```

### Performance Considerations

```python
# ❌ ANTI-PATTERN: Large dimension not broadcast
@dp.streaming_table
def slow_enrichment():
    stream = spark.readStream.table("events")
    large_dimension = spark.table("huge_dimension")  # 10GB table
    
    return stream.join(large_dimension, "key")  # Shuffle join (slow)

# ✅ BEST PRACTICE: Broadcast small dimensions
@dp.streaming_table
def fast_enrichment():
    stream = spark.readStream.table("events")
    small_dimension = F.broadcast(spark.table("dim_table"))  # <10MB
    
    return stream.join(small_dimension, "key")  # Broadcast join (fast)
```

### Dimension Table Updates

```python
# Important: Batch tables read at query start
@dp.streaming_table
def stream_with_dimension():
    """
    Dimension table is read once when streaming query starts.
    
    To pick up dimension updates:
    1. Stop pipeline
    2. Refresh dimension (run batch update)
    3. Restart pipeline (reads new dimension version)
    
    Or: Use Delta table time travel for specific versions
    """
    stream = spark.readStream.table("events")
    dimension = spark.table("customers")  # Snapshot at query start
    
    return stream.join(dimension, "customer_id")
```

## 7. Incremental Processing Patterns

### Pattern 1: Simple Append

```python
@dp.streaming_table(
    name="bronze_logs",
    comment="Incremental append of log files"
)
def bronze_logs():
    """
    Simplest pattern: Just append new records.
    Use when:
    - Data is naturally append-only
    - No updates or deletes
    - Order doesn't matter
    """
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/mnt/logs/")
    )
```

### Pattern 2: Deduplication on Append

```python
@dp.streaming_table(
    name="deduplicated_events",
    comment="Events deduplicated by event_id"
)
def deduplicated_events():
    """
    Remove duplicates before materializing.
    Use when:
    - Source may send duplicates
    - Have unique identifier
    - Can afford watermark-bounded deduplication
    """
    return (
        spark.readStream.table("raw.events")
        .withWatermark("event_timestamp", "24 hours")
        .dropDuplicates(["event_id"])
    )
```

### Pattern 3: Late-Arriving Data with Merge

```python
@dp.streaming_table(
    name="merged_events",
    comment="Events merged to handle late arrivals"
)
def merged_events():
    """
    Handle updates and late-arriving data.
    Use when:
    - Records can be updated
    - Late data must update existing records
    - Have unique key for merging
    """
    return (
        spark.readStream.table("raw.events")
        .withWatermark("event_timestamp", "6 hours")
    )
# Note: Lakeflow handles merge logic automatically for streaming tables
```

### Pattern 4: Windowed Aggregation

```python
@dp.streaming_table(
    name="hourly_metrics",
    comment="Hourly aggregated metrics"
)
def hourly_metrics():
    """
    Time-windowed aggregations.
    Use when:
    - Need time-based summaries
    - Can tolerate watermark delay
    - Want incremental computation
    """
    return (
        spark.readStream.table("events")
        .withWatermark("event_timestamp", "1 hour")
        .groupBy(
            F.window("event_timestamp", "1 hour"),
            "category"
        )
        .agg(
            F.sum("revenue").alias("total_revenue"),
            F.count("*").alias("event_count")
        )
        .select(
            F.col("window.start").alias("hour"),
            "category",
            "total_revenue",
            "event_count"
        )
    )
```

### Pattern 5: Sessionization

```python
@dp.streaming_table(
    name="user_sessions",
    comment="User sessions with 30-minute timeout"
)
def user_sessions():
    """
    Group events into sessions based on inactivity timeout.
    Use when:
    - Need to track user sessions
    - Have inactivity timeout definition
    - Can maintain session state
    """
    return (
        spark.readStream.table("events")
        .withWatermark("event_timestamp", "1 hour")
        .groupBy(
            "user_id",
            F.session_window("event_timestamp", "30 minutes")  # Session window
        )
        .agg(
            F.min("event_timestamp").alias("session_start"),
            F.max("event_timestamp").alias("session_end"),
            F.count("*").alias("events_in_session"),
            F.collect_list("page").alias("pages_visited")
        )
    )
```

## 8. Performance Optimization for Streaming

### Trigger Intervals

```python
# Lakeflow manages triggers automatically, but understanding helps:

# Micro-batch (default): Process new data every few seconds
# - Good for: Most streaming workloads
# - Latency: Seconds
# - Throughput: Balanced

# Continuous: Process with minimal latency
# - Good for: Ultra-low latency requirements
# - Latency: Milliseconds
# - Throughput: Lower (experimental)
```

### Optimize Shuffle Operations

```python
@dp.streaming_table
def optimized_aggregation():
    """
    Minimize shuffle by:
    1. Using appropriate partition keys
    2. Setting shuffle partitions
    3. Enabling AQE (Adaptive Query Execution)
    """
    return (
        spark.readStream.table("events")
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            F.window("timestamp", "5 minutes"),
            "user_id"  # Good partition key (high cardinality)
        )
        .agg(F.sum("amount"))
    )
# Spark configs:
# spark.sql.shuffle.partitions=200  # Adjust based on data volume
# spark.sql.adaptive.enabled=true   # Enable AQE
```

### File Compaction

```python
@dp.streaming_table(
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",  # Right-size files on write
        "delta.autoOptimize.autoCompact": "true"      # Auto-compact small files
    }
)
def optimized_stream():
    """
    Streaming tables can generate many small files.
    Auto-optimization prevents small file problems.
    """
    return spark.readStream.table("raw.events")
```

### Monitoring Streaming Performance

```python
# Key metrics to monitor:
# 1. Input rate: Records/second received
# 2. Processing rate: Records/second processed
# 3. Batch duration: Time to process each micro-batch
# 4. State size: Memory used for stateful operations
# 5. Watermark lag: How far behind watermark is

# Access via Spark UI → Streaming tab
# Or programmatically:
# query.lastProgress  # Last batch statistics
# query.status        # Current query status
```

## Summary

In this notebook, we explored streaming tables and incremental processing in Lakeflow:

### Key Concepts Covered

1. **Streaming Tables (`@dp.streaming_table`)**
   - Real-time processing with continuous incremental execution
   - Automatic checkpoint management by Lakeflow
   - Exactly-once processing guarantees

2. **Checkpointing**
   - Automatic state management for fault tolerance
   - Recovery from failures without data loss
   - Exactly-once semantics with transactional writes

3. **Watermarking**
   - Handling late-arriving and out-of-order data
   - Bounded state management for aggregations
   - Configurable lateness tolerance

4. **Stateful Operations**
   - Windowed aggregations with time windows
   - Deduplication patterns
   - Stream-to-stream joins
   - Session windows and user activity tracking

5. **Stream-to-Batch Joins**
   - Enriching streams with dimension tables
   - Performance optimization with broadcast joins
   - Handling dimension table updates

6. **Incremental Processing Patterns**
   - Simple append for log data
   - Deduplication for idempotency
   - Merge patterns for updates
   - Windowed aggregations for metrics
   - Sessionization for user tracking

7. **Performance Optimization**
   - Trigger interval configuration
   - Shuffle optimization strategies
   - File compaction with Delta Lake
   - Monitoring streaming metrics

### Functional Programming Benefits

- **Pure Functions**: Transformation logic remains pure, side effects in actions
- **Composition**: Stream transformations compose functionally
- **Immutability**: Streaming DataFrames are immutable
- **Declarative**: Define what to compute, Spark handles how

### Best Practices

✅ Always use watermarks for stateful operations
✅ Choose appropriate watermark duration for data characteristics
✅ Use broadcast joins for small dimension tables
✅ Monitor state size and batch processing time
✅ Enable auto-optimization for streaming tables
✅ Extract transformation logic into testable pure functions

### Next Steps

- **6.4**: Data quality with expectations in Lakeflow
- **6.5**: Advanced flows and CDC patterns
- **6.6**: Best practices and anti-patterns


## Exercises

Practice streaming table patterns:

**Exercise 1: Basic Streaming Table**
- Define a streaming table reading from a Delta source
- Add simple transformations (filter, select)
- Observe checkpoint creation and management

**Exercise 2: Watermark Configuration**
- Create windowed aggregation with watermark
- Experiment with different watermark durations
- Monitor state size and late data handling

**Exercise 3: Stream Enrichment**
- Join streaming events with batch dimension table
- Implement broadcast join optimization
- Compare performance with and without broadcast

**Exercise 4: Deduplication Pattern**
- Implement deduplication on streaming data
- Test with synthetic duplicate records
- Measure deduplication effectiveness

**Exercise 5: Session Windows**
- Create user session tracking with session windows
- Define appropriate session timeout
- Calculate session metrics (duration, page views)

**Exercise 6: Performance Tuning**
- Profile a streaming workload
- Identify bottlenecks (shuffle, state, I/O)
- Apply optimization strategies
- Measure performance improvements
