# Lesson 18 - Streaming Transformations and Joins

Okay, here are detailed technical notes on PySpark Structured Streaming, focusing on transformations, aggregations, joins with static data, and various sinks. These notes are designed for professional learners and can be adapted for training materials or reference documentation.

---

## PySpark Structured Streaming: Transformations, Joins, and Sinks

**Introduction**

PySpark Structured Streaming provides a high-level API for building scalable, fault-tolerant, end-to-end streaming applications. It leverages the Spark SQL engine, allowing developers to express streaming computations in the same way they express batch computations on static data using DataFrames and Datasets. The core abstraction is the DataFrame/Dataset, representing an unbounded table where new data continuously arrives. This model enables powerful transformations, aggregations, and joins on streaming data.

This document delves into key aspects of processing streaming data: performing aggregations, joining streams with static datasets, and writing results to various external systems (sinks).

---

### 1. Aggregations in Streaming DataFrames

**Theory**

Aggregating data over a stream is fundamentally different from batch aggregation because the dataset is unbounded. Structured Streaming handles this through **stateful processing**. When performing aggregations (e.g., counts, sums, averages), Spark maintains intermediate state across micro-batches.

*   **State Management:** Spark automatically manages the running state (e.g., the current count for each group) required for the aggregation. This state is stored reliably (using checkpointing) to ensure fault tolerance. However, without constraints, this state can grow indefinitely as new keys/groups arrive.
*   **Watermarking:** To manage potentially unbounded state, Structured Streaming introduces **watermarking**. A watermark defines a threshold for how "late" data can be before it's ignored for stateful operations like aggregations. It allows Spark to track the progress of event time and automatically clean up old state associated with groups whose event time is significantly older than the watermark. Watermarks are crucial for limiting memory/disk usage for long-running streaming aggregations. They are typically defined on an event-time column.
*   **Output Modes:** The way aggregation results are written to a sink depends on the chosen output mode:
    *   `OutputMode.Complete()`: The entire updated result table (all aggregated groups) is written to the sink in every trigger interval. This is typically used for aggregations *without* watermarking, but can be resource-intensive as the full state is outputted each time. Not supported for aggregations with watermarking.
    *   `OutputMode.Append()`: Only *new* rows added to the result table since the last trigger are written to the sink. This is applicable only when rows added to the result table will never be updated again. For aggregations, this usually requires watermarking, ensuring that once a window's aggregation is finalized (passed the watermark), it won't change and can be appended.
    *   `OutputMode.Update()`: Only rows that were *updated* in the result table since the last trigger are written to the sink. If watermarking is used, rows corresponding to older, cleaned-up state are *not* outputted again. This is the default mode for aggregations with or without watermarking (when `Complete` isn't applicable).

**Code Example: Windowed Event Count with Watermarking**

Let's consider a stream of events with timestamps. We want to count events within 10-minute tumbling windows, updating every 5 minutes, and handle data arriving up to 1 hour late.

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# 1. Initialize Spark Session
spark = SparkSession \
    .builder \
    .appName("StreamingAggregationExample") \
    .config("spark.sql.shuffle.partitions", 4) # Example config for parallelism
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # Reduce verbosity

# 2. Define schema for incoming data (e.g., from Kafka or socket)
# Assuming data like: {"timestamp": "2023-10-27T10:00:00Z", "device": "A"}
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("device", StringType(), True)
])

# 3. Create a Streaming DataFrame (using Rate source for simplicity)
# In a real scenario, replace 'rate' with 'kafka', 'socket', etc.
streaming_df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 5) \
    .load() \
    .selectExpr("CAST(timestamp AS TIMESTAMP) as event_time", # Rate source timestamp
                "CONCAT('device_', CAST(value % 3 AS STRING)) as device_id") # Simulate device IDs

# Add a small delay for demonstration if needed (optional)
# streaming_df = streaming_df.withColumn("event_time", col("event_time") - expr("INTERVAL 2 MINUTES"))

# 4. Define Watermark and Perform Aggregation
# Watermark: Allow data to be 1 hour late based on 'event_time'
# Window: Tumble windows of 10 minutes duration
# GroupBy: Group by window and device_id
# Aggregation: Count events in each group
windowed_counts = streaming_df \
    .withWatermark("event_time", "1 hour") \
    .groupBy(
        window(col("event_time"), "10 minutes"), # Tumbling window
        col("device_id")
    ) \
    .count() \
    .orderBy("window") # Optional: Order output for clarity

# 5. Define the Streaming Sink (Console for demonstration)
# Output Mode: Update - only write updated rows to the console
query = windowed_counts \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='5 seconds') # Process data every 5 seconds
    .start()

# 6. Keep the query running
query.awaitTermination()

# 7. Stop the SparkSession (won't be reached in awaitTermination)
# spark.stop()
```

**Line-by-Line Explanation:**

1.  `SparkSession.builder...getOrCreate()`: Standard way to create or get a SparkSession, the entry point for Spark functionality. `spark.sql.shuffle.partitions` is configured to control parallelism during shuffles (like in `groupBy`).
2.  `StructType([...])`: Defines the expected structure and data types of the incoming streaming data. Crucial for schema enforcement.
3.  `spark.readStream...load()`: Creates the initial streaming DataFrame. We use the `rate` source for generating test data with timestamps. `selectExpr` is used to cast the timestamp and create a simulated device ID column.
4.  `.withWatermark("event_time", "1 hour")`: Specifies that the `event_time` column should be used for watermarking, and the system should tolerate data arriving up to 1 hour late relative to the maximum event time seen so far.
5.  `.groupBy(window(...), col(...))`: Groups the data. `window(col("event_time"), "10 minutes")` creates 10-minute tumbling windows based on the `event_time`. We also group by `device_id`.
6.  `.count()`: Performs the aggregation (counting rows) within each group (window and device).
7.  `.orderBy("window")`: Sorts the output micro-batch for better readability in the console (optional).
8.  `windowed_counts.writeStream...start()`: Configures and starts the streaming query.
    *   `.outputMode("update")`: Specifies that only rows whose counts have changed since the last trigger should be written. With watermarking, this ensures that old, finalized windows are eventually removed from the output as their state is cleaned up.
    *   `.format("console")`: Specifies the sink type – print output to the console.
    *   `.option("truncate", "false")`: Prevents truncation of long column values in the console output.
    *   `.trigger(processingTime='5 seconds')`: Defines how often the stream should be processed (trigger interval).
    *   `.start()`: Starts the query execution asynchronously.
9.  `query.awaitTermination()`: Blocks the main thread, keeping the application alive until the streaming query is stopped (manually or due to an error).

**Practical Use Cases:**

*   Real-time monitoring dashboards (e.g., requests per minute per endpoint).
*   Anomaly detection (e.g., sudden spikes in error counts within time windows).
*   Sessionization of user activity based on event time.

---

### 2. Joining Streaming DataFrames with Static DataFrames

**Theory**

A common pattern in streaming applications is enriching incoming event data with relatively static reference or dimensional data. For example, joining a stream of user clicks (containing user IDs) with a static user profile table (containing user demographics).

Structured Streaming supports joining a streaming DataFrame with a static DataFrame (or Dataset).

*   **Supported Join Types:**
    *   Inner Join
    *   Left Outer Join: `streamingDF.join(staticDF, ...)` (stream rows must have a match in static data, or nulls are added for static columns).
    *   *Right Outer and Full Outer joins are NOT supported* because they would require the static DataFrame to update the results indefinitely as new streaming data arrives without matches, which complicates state management.
*   **Execution:** Spark's optimizer handles this join efficiently. If the static DataFrame is small enough (controlled by `spark.sql.autoBroadcastJoinThreshold`), it will be broadcast to all executors, avoiding expensive shuffles. For larger static DataFrames, a standard shuffle-based join will occur, but only involving the data from the current micro-batch of the stream. The static DataFrame is read once per micro-batch execution unless caching strategies are employed.

**Code Example: Enriching Event Stream with User Metadata**

Assume we have the same event stream as before (`streaming_df`) containing `device_id`, and a static DataFrame containing metadata for each device.

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

# 1. Initialize Spark Session (reuse from previous example or create new)
spark = SparkSession \
    .builder \
    .appName("StreamingStaticJoinExample") \
    .config("spark.sql.shuffle.partitions", 4)
    .config("spark.sql.autoBroadcastJoinThreshold", "100m") # Increase threshold if static data is larger but fits in memory
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# 2. Create Static DataFrame (Device Metadata)
# In a real scenario, load this from a file, database, etc.
static_data = [
    ("device_0", "Factory A", "Sensor Model X"),
    ("device_1", "Factory B", "Sensor Model Y"),
    ("device_2", "Factory A", "Sensor Model X")
]
static_schema = StructType([
    StructField("static_device_id", StringType(), False),
    StructField("location", StringType(), False),
    StructField("model", StringType(), False)
])
static_df = spark.createDataFrame(data=static_data, schema=static_schema)

# Optional: Cache the static DataFrame for potentially faster access in repeated micro-batches
static_df.cache()
static_df.count() # Action to trigger caching

# 3. Create Streaming DataFrame (reuse from previous example or define again)
streaming_df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .load() \
    .selectExpr("CAST(timestamp AS TIMESTAMP) as event_time",
                "CONCAT('device_', CAST(value % 4 AS STRING)) as device_id") # Simulate some missing devices

# 4. Perform the Join
# Join Condition: streaming_df.device_id == static_df.static_device_id
# Join Type: Left Outer (keep all stream events, add metadata if available)
enriched_stream_df = streaming_df.join(
    static_df,
    streaming_df.device_id == static_df.static_device_id,
    "left_outer"
)

# Select desired columns after join
final_stream_df = enriched_stream_df.select(
    "event_time",
    "device_id",
    "location", # This will be null if no match in static_df
    "model"     # This will be null if no match in static_df
)

# 5. Define the Streaming Sink (Console)
query = final_stream_df \
    .writeStream \
    .outputMode("append") # Append is suitable for simple transformations/joins
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='10 seconds')
    .start()

# 6. Keep the query running
query.awaitTermination()

# 7. Stop SparkSession
# spark.stop()
```

**Line-by-Line Explanation:**

1.  `SparkSession.builder...getOrCreate()`: Initializes Spark. `spark.sql.autoBroadcastJoinThreshold` is set to potentially broadcast the static DataFrame if it's under 100MB.
2.  `static_data`, `static_schema`, `spark.createDataFrame()`: Creates a sample static DataFrame containing device metadata. In practice, this would be loaded from persistent storage (`spark.read.parquet(...)`, `spark.read.jdbc(...)`, etc.).
3.  `static_df.cache()`: Suggests to Spark that this DataFrame should be kept in memory after the first computation. `static_df.count()` is an action that forces the DataFrame to be computed and cached. This can improve performance if the static data is accessed repeatedly across micro-batches.
4.  `spark.readStream...load()`: Defines the streaming source DataFrame (similar to the aggregation example).
5.  `streaming_df.join(static_df, join_condition, "left_outer")`: Performs the join.
    *   `static_df`: The DataFrame to join with.
    *   `streaming_df.device_id == static_df.static_device_id`: The condition defining how rows are matched between the two DataFrames.
    *   `"left_outer"`: Specifies the join type. All rows from the `streaming_df` (left side) are kept. If a match is found in `static_df`, the corresponding columns (`location`, `model`) are added. If no match is found, these columns will have `null` values.
6.  `enriched_stream_df.select(...)`: Selects and potentially renames columns for the final output stream.
7.  `final_stream_df.writeStream...start()`: Configures and starts the streaming query.
    *   `.outputMode("append")`: Since this is a simple transformation/join without aggregation or watermarking impacting existing rows, `append` mode is suitable. Each micro-batch's results are simply appended to the sink.
    *   `.format("console")`: Writing output to the console.
    *   `.trigger(processingTime='10 seconds')`: Processing interval.
8.  `query.awaitTermination()`: Keeps the application running.

**Practical Use Cases:**

*   Real-time ad targeting: Joining an impression stream (user ID, ad ID) with user segment data (static).
*   IoT data enrichment: Joining sensor readings (sensor ID, value) with sensor metadata (location, calibration info).
*   Fraud detection: Joining a transaction stream (account ID, amount) with account risk scores (static).

---

### 3. Streaming Sinks: Writing Streaming Data

**Theory**

A streaming sink is the destination where the results of a streaming query are written. Structured Streaming provides built-in support for various sinks, each with different characteristics and configuration options. A crucial requirement for most production sinks is **fault tolerance** and **exactly-once semantics** (or at-least-once). This is typically achieved using **checkpointing**.

*   **Checkpointing:** Structured Streaming periodically saves the progress of the query (processed offsets, running state for aggregations) to a reliable distributed filesystem (like HDFS, S3, ADLS). If the query fails, it can restart from the last checkpoint, ensuring no data loss and preventing duplicate processing (for idempotent sinks or transactional sinks). `option("checkpointLocation", "/path/to/checkpoint/dir")` is essential for most non-console sinks in production.

**Common Sinks**

**a) Console Sink**

*   **Purpose:** Primarily for debugging and development. Prints output to the driver's console.
*   **Fault Tolerance:** Not fault-tolerant. Does not support checkpointing directly for output (though internal state might be checkpointed if needed).
*   **Semantics:** At-most-once (if driver fails, output might be lost).
*   **Key Options:**
    *   `numRows`: Number of rows to print (default 20).
    *   `truncate`: Whether to truncate long string values (default true).

```python
# (Assuming 'result_stream_df' is the final streaming DataFrame)

console_query = result_stream_df \
    .writeStream \
    .format("console") \
    .outputMode("update") # Or "append", "complete" depending on the transformation
    .option("truncate", "false") \
    .option("numRows", 50) \
    .trigger(processingTime='10 seconds') \
    .start()

# console_query.awaitTermination()
```

**b) File Sink**

*   **Purpose:** Writing results to files in a directory on a distributed filesystem (HDFS, S3, Azure Blob Storage, etc.). Suitable for archiving or feeding downstream batch processes.
*   **Fault Tolerance:** Fault-tolerant using checkpointing.
*   **Semantics:** Exactly-once (achieved through checkpointing and transactional writes to a metadata log within the output directory).
*   **Output Format:** Supports various file formats (`parquet`, `orc`, `json`, `csv`, etc.). Parquet is generally recommended for efficiency.
*   **Key Options:**
    *   `path`: The output directory path.
    *   `checkpointLocation`: Path to the checkpoint directory (mandatory for fault tolerance).
    *   `format`: File format (e.g., `"parquet"`, `"json"`).
    *   `partitionBy`: (Optional) Partition the output data into subdirectories based on column values (e.g., `partitionBy("date", "hour")`). Improves read performance for consumers filtering by partition columns.

```python
# (Assuming 'result_stream_df' is the final streaming DataFrame)

file_query = result_stream_df \
    .writeStream \
    .format("parquet") # Recommended format
    .outputMode("append") # Append is common for file sinks
    .option("path", "s3a://my-bucket/output/data") # Output path
    .option("checkpointLocation", "s3a://my-bucket/output/checkpoints") # Checkpoint path
    .partitionBy("event_date") # Example partitioning column (must exist in result_stream_df)
    .trigger(processingTime='1 minute') # Trigger less frequently for file sinks typically
    .start()

# file_query.awaitTermination()
```

**Line-by-Line Explanation (File Sink):**

1.  `.format("parquet")`: Specifies the output file format.
2.  `.outputMode("append")`: New rows generated in each micro-batch are added to the output directory.
3.  `.option("path", ...)`: Defines the target directory where Parquet files will be written. Spark creates subdirectories for each micro-batch output.
4.  `.option("checkpointLocation", ...)`: **Crucial**. Specifies the directory for storing checkpoint information. This enables recovery from failures. Must be a reliable, distributed filesystem path.
5.  `.partitionBy("event_date")`: Instructs Spark to create subdirectories based on the unique values in the `event_date` column (e.g., `/path/event_date=2023-10-27/...`). This significantly speeds up reads that filter on `event_date`.
6.  `.trigger(processingTime='1 minute')`: Sets the processing interval. File sinks often use longer intervals to avoid creating too many small files.

**c) Kafka Sink**

*   **Purpose:** Publishing results to Apache Kafka topics. Ideal for integrating with downstream real-time applications or microservices.
*   **Fault Tolerance:** Fault-tolerant using checkpointing.
*   **Semantics:** At-least-once by default. Exactly-once can be achieved if Kafka producer is configured idempotently (enabled by default in recent Kafka versions) and transactions are used (though Spark's Kafka sink doesn't use Kafka transactions explicitly, its checkpointing mechanism provides effective exactly-once guarantees when restarting).
*   **Serialization:** Data needs to be serialized into a format Kafka understands, typically as key/value byte arrays. Often requires selecting specific columns and casting them (e.g., to JSON strings or binary formats like Avro).
*   **Key Options:**
    *   `kafka.bootstrap.servers`: List of Kafka broker addresses.
    *   `topic`: The Kafka topic to write to.
    *   `checkpointLocation`: Path to the checkpoint directory (mandatory for fault tolerance).
    *   (Optional) `key.serializer`, `value.serializer` (if needed beyond default StringSerializer).
    *   (Optional) Other Kafka producer configurations can be passed via `option("kafka.producer.some_config", "value")`.

```python
from pyspark.sql.functions import to_json, struct

# (Assuming 'result_stream_df' has columns like 'key_col', 'value_col', 'other_data')

# Prepare data for Kafka: often requires selecting key/value and serializing
# Example: Serialize entire row as JSON string in the 'value'
kafka_df = result_stream_df \
    .select(
        col("key_col").cast("string").alias("key"), # Kafka key (optional, string)
        to_json(struct("*")).alias("value") # Kafka value (JSON string of all columns)
    )
# Alternative: select specific columns for value
# kafka_df = result_stream_df.select(col("id").cast("string").alias("key"), col("data_payload").cast("string").alias("value"))


kafka_query = kafka_df \
    .writeStream \
    .format("kafka") \
    .outputMode("append") # Usually append for Kafka unless updates are meaningful downstream
    .option("kafka.bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092") # Kafka brokers
    .option("topic", "output-topic") # Target Kafka topic
    .option("checkpointLocation", "/path/to/kafka/checkpoints") # Checkpoint path
    .trigger(processingTime='10 seconds')
    .start()

# kafka_query.awaitTermination()
```

**Line-by-Line Explanation (Kafka Sink):**

1.  `.select(...)`: Prepares the DataFrame for Kafka. Kafka messages are key-value pairs.
    *   `col("key_col").cast("string").alias("key")`: Selects a column to be used as the Kafka message key and casts it to string. The `.alias("key")` is important for the Kafka sink to identify it. Can be omitted if no key is needed.
    *   `to_json(struct("*")).alias("value")`: Selects the Kafka message value. Here, `struct("*")` creates a struct containing all columns from `result_stream_df`, and `to_json` converts this struct into a JSON string. `.alias("value")` identifies this column as the value for the Kafka sink.
2.  `.format("kafka")`: Specifies the Kafka sink type.
3.  `.outputMode("append")`: Each processed row is sent as a new message to Kafka.
4.  `.option("kafka.bootstrap.servers", ...)`: Provides the connection string for the Kafka cluster.
5.  `.option("topic", ...)`: Specifies the Kafka topic name.
6.  `.option("checkpointLocation", ...)`: **Crucial** for fault tolerance and ensuring messages aren't missed or duplicated upon restarts.

---

### 4. Advanced Considerations and Performance Tuning

*   **State Store Management:** For aggregations with large state (many groups), the default in-memory state store might cause memory issues. Consider using the RocksDB-based state store for better scalability by spilling state to local disk:
    ```python
    spark.conf.set("spark.sql.streaming.stateStore.providerClass",
                   "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
    ```
    This requires adding the `spark-sql-streaming-state-store-rocksdb` dependency.
*   **Shuffle Partitions:** Tune `spark.sql.shuffle.partitions` based on cluster size and data volume. Too few partitions limit parallelism; too many can cause overhead with small tasks. Monitor task durations in the Spark UI.
*   **Trigger Interval:** Choose `Trigger.ProcessingTime` based on latency requirements vs. resource usage. Smaller intervals mean lower latency but higher overhead. `Trigger.Once()` processes all available data in one batch (useful for quasi-batch processing). `Trigger.Continuous()` offers the lowest latency (experimental, sub-millisecond) but has more constraints.
*   **File Sink Optimization:**
    *   **Partitioning:** Use `partitionBy` wisely based on common query filters for downstream consumers. Avoid partitioning on high-cardinality columns.
    *   **Small Files:** Very frequent triggers with file sinks can create many small files, which is inefficient for HDFS/S3. Consider longer trigger intervals or using Delta Lake as a sink, which handles small files better through compaction.
*   **Data Skew:** In aggregations or joins, if certain keys are much more frequent than others, they can create straggler tasks. Techniques like salting (adding a random prefix/suffix to skewed keys) can sometimes help, but are more complex to implement correctly in streaming.
*   **Early Filtering/Projection:** Filter data and select only necessary columns as early as possible in the streaming query plan to reduce data shuffled and processed.
*   **Static Join Optimization:** Ensure `spark.sql.autoBroadcastJoinThreshold` is set appropriately for stream-static joins, or explicitly broadcast the static DataFrame (`from pyspark.sql.functions import broadcast; streaming_df.join(broadcast(static_df), ...)`) if it fits comfortably in executor memory. Cache the static DataFrame if it's read repeatedly.
*   **Monitoring:** Use the Spark UI (especially the Structured Streaming tab) to monitor input/processing rates, batch durations, state store memory usage, and potential bottlenecks. Ganglia, Prometheus, and Grafana can provide cluster-level monitoring.

---

**Conclusion**

PySpark Structured Streaming offers a robust framework for complex stream processing. Understanding how to implement stateful aggregations using watermarking, perform efficient joins with static data, and correctly configure sinks with checkpointing is crucial for building reliable and scalable real-time applications. Performance tuning often involves balancing latency, throughput, and resource utilization by adjusting configurations related to parallelism, state management, trigger intervals, and sink-specific options. Careful design and monitoring are key to successful streaming deployments.

---