# Structured Streaming Basics

## Overview
This notebook introduces Structured Streaming in Spark - a scalable and fault-tolerant stream processing engine built on Spark SQL.

## Learning Objectives
- Understand streaming concepts
- Read from streaming sources
- Apply transformations on streams
- Write to streaming sinks
- Understand triggers and output modes

---

## 1. Streaming Concepts

### What is Streaming?

**Batch Processing**:
```
Data → Process → Results
      (once)
```

**Stream Processing**:
```
Data Stream → Process → Results Stream
             (continuously)
```

### Structured Streaming Model

Think of streaming data as an **unbounded table** that continuously grows:

```
Input Stream → Unbounded Table → Query → Result Table → Output
```

**Key Concepts**:
- **Source**: Where data comes from (Kafka, files, sockets)
- **Transformation**: Operations on streaming data
- **Sink**: Where results go (console, files, Delta tables)
- **Trigger**: When to process new data
- **Checkpoint**: For fault tolerance and exactly-once semantics

## 2. Reading Streams

### From Files (Most Common in Databricks)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define schema for streaming data
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("value", DoubleType(), False),
    StructField("category", StringType(), True)
])

# Read stream from CSV files
stream_df = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/path/to/streaming/directory")

print("Stream DataFrame created (not executed yet)")
print(f"Is streaming: {stream_df.isStreaming}")

### From Delta Tables (Recommended)

In [None]:
# Read stream from Delta table
delta_stream = spark.readStream \
    .format("delta") \
    .load("/path/to/delta/table")

# Read with options
delta_stream = spark.readStream \
    .format("delta") \
    .option("maxFilesPerTrigger", 10) \
    .option("startingVersion", "latest") \
    .load("/path/to/delta/table")

print("Delta stream configured")

### From Kafka

In [None]:
# Read from Kafka topic
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka data comes as binary - need to cast
kafka_data = kafka_stream.select(
    col("key").cast("string"),
    col("value").cast("string"),
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp")
)

print("Kafka stream configured (requires Kafka cluster)")

## 3. Stream Transformations

Most DataFrame operations work on streams!

In [None]:
# Assuming we have stream_df from earlier

# Filter
filtered_stream = stream_df.filter(col("value") > 100)

# Select and transform
transformed_stream = stream_df.select(
    col("id"),
    col("timestamp"),
    col("value"),
    (col("value") * 1.1).alias("value_with_tax"),
    upper(col("category")).alias("category_upper")
)

# Add derived columns
enriched_stream = stream_df \
    .withColumn("hour", hour(col("timestamp"))) \
    .withColumn("date", to_date(col("timestamp"))) \
    .withColumn("value_category",
                when(col("value") < 50, "Low")
                .when(col("value") < 100, "Medium")
                .otherwise("High"))

print("Stream transformations defined")

## 4. Aggregations on Streams

In [None]:
# Simple aggregation (running count)
count_stream = stream_df \
    .groupBy("category") \
    .count()

# Multiple aggregations
stats_stream = stream_df \
    .groupBy("category") \
    .agg(
        count("*").alias("count"),
        avg("value").alias("avg_value"),
        sum("value").alias("total_value"),
        min("value").alias("min_value"),
        max("value").alias("max_value")
    )

print("Aggregation streams defined")

### Time-based Aggregations (Windowing)

In [None]:
# Tumbling window (non-overlapping)
tumbling_window_stream = stream_df \
    .groupBy(
        window(col("timestamp"), "10 minutes"),
        col("category")
    ) \
    .agg(
        count("*").alias("count"),
        avg("value").alias("avg_value")
    )

# Sliding window (overlapping)
sliding_window_stream = stream_df \
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),  # 10min window, 5min slide
        col("category")
    ) \
    .count()

print("Window aggregations defined")

## 5. Writing Streams

### Output Modes

- **Append**: Only new rows (default for non-aggregated)
- **Complete**: Entire result table (for aggregations)
- **Update**: Only changed rows (for aggregations)

### Write to Console (for testing)

In [None]:
# Write to console
query = stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

# Wait for termination (would run continuously)
# query.awaitTermination()

# Stop the query
# query.stop()

print("Console query created (commented out to avoid running)")

### Write to Delta Table (Recommended)

In [None]:
# Write stream to Delta table
delta_query = stream_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .start("/path/to/output/delta/table")

# With partitioning
partitioned_query = stream_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .partitionBy("date") \
    .start("/path/to/output/delta/table")

print("Delta write queries configured")

### Write to Files

In [None]:
# Write to Parquet files
file_query = stream_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .option("path", "/path/to/output") \
    .start()

print("File write query configured")

## 6. Triggers

Control when processing happens.

In [None]:
# Default trigger (process ASAP)
default_query = stream_df.writeStream \
    .format("console") \
    .start()

# Fixed interval trigger (micro-batch every X time)
timed_query = stream_df.writeStream \
    .format("console") \
    .trigger(processingTime="30 seconds") \
    .start()

# One-time trigger (process once and stop)
once_query = stream_df.writeStream \
    .format("console") \
    .trigger(once=True) \
    .start()

# Available batch trigger (process all available data)
available_query = stream_df.writeStream \
    .format("console") \
    .trigger(availableNow=True) \
    .start()

print("Various trigger types demonstrated")

## 7. Managing Streaming Queries

In [None]:
# Assuming we have a query running
# query = stream_df.writeStream.format("console").start()

# Get query ID
# print(f"Query ID: {query.id}")

# Get query name
# print(f"Query name: {query.name}")

# Check if query is active
# print(f"Is active: {query.isActive}")

# Get recent progress
# print(query.recentProgress)

# Get last progress
# print(query.lastProgress)

# Stop query
# query.stop()

# List all active streams
# print(spark.streams.active)

# Await termination with timeout
# query.awaitTermination(timeout=60)  # Wait up to 60 seconds

print("Query management methods (commented out)")

## 8. Complete Example: Real-time Aggregation

In [None]:
# Example: Real-time sales monitoring

# 1. Define schema
sales_schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("product_id", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("region", StringType(), False)
])

# 2. Read stream
sales_stream = spark.readStream \
    .format("delta") \
    .schema(sales_schema) \
    .load("/bronze/sales")

# 3. Transform and aggregate
sales_summary = sales_stream \
    .withColumn("revenue", col("amount") * col("quantity")) \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("region")
    ) \
    .agg(
        count("*").alias("transaction_count"),
        sum("revenue").alias("total_revenue"),
        avg("amount").alias("avg_transaction_amount")
    )

# 4. Write to Delta table
sales_query = sales_summary.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/sales_summary") \
    .trigger(processingTime="1 minute") \
    .start("/gold/sales_summary")

print("Complete streaming pipeline configured")
print(f"Query name: {sales_query.name}")

# In production, you would:
# sales_query.awaitTermination()

## 9. Best Practices

### Checkpoint Location
Always specify checkpoint location for fault tolerance:
```python
.option("checkpointLocation", "/reliable/path")
```

### Schema Definition
Always define schema explicitly for streams:
```python
.schema(my_schema)
```

### Output Mode Selection
- Use **append** for non-aggregated data
- Use **update** for aggregations (most efficient)
- Use **complete** only when necessary (outputs entire table)

### Monitoring
- Use `query.recentProgress` to monitor
- Check `inputRowsPerSecond` and `processedRowsPerSecond`
- Monitor checkpoint size

### Error Handling
- Streams will automatically retry on transient failures
- Monitor for persistent failures
- Use dead letter queues for bad records

## 10. Common Patterns

### Pattern 1: Bronze to Silver Streaming

In [None]:
# Read from bronze
bronze_stream = spark.readStream.format("delta").load("/bronze/raw_events")

# Clean and enrich
silver_stream = bronze_stream \
    .filter(col("event_id").isNotNull()) \
    .dropDuplicates(["event_id"]) \
    .withColumn("processed_timestamp", current_timestamp()) \
    .withColumn("date", to_date(col("timestamp")))

# Write to silver
silver_query = silver_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/silver") \
    .partitionBy("date") \
    .start("/silver/clean_events")

print("Bronze to Silver pattern configured")

### Pattern 2: Stream-Stream Join

In [None]:
# Two streams
orders_stream = spark.readStream.format("delta").load("/streams/orders")
payments_stream = spark.readStream.format("delta").load("/streams/payments")

# Join with watermark
joined_stream = orders_stream \
    .withWatermark("order_timestamp", "10 minutes") \
    .join(
        payments_stream.withWatermark("payment_timestamp", "10 minutes"),
        "order_id",
        "inner"
    )

print("Stream-stream join pattern")

## Summary

In this notebook, you learned:

✅ Streaming concepts and architecture
✅ Reading from various streaming sources
✅ Applying transformations on streams
✅ Windowed aggregations
✅ Writing to different sinks
✅ Triggers and output modes
✅ Managing streaming queries
✅ Best practices and common patterns

## Next Steps

1. Practice with real streaming data
2. Learn about watermarking for late data
3. Explore Delta Live Tables for declarative pipelines
4. Study stateful operations

## Additional Resources

- [Structured Streaming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Streaming](https://docs.databricks.com/structured-streaming/)