# Stream Processing and Analysis with Apache Spark

Learn the essentials of stream processing and analysis with Apache Spark in this course. Gain a solid understanding of stream
processing fundamentals and develop applications using the Spark Structured Streaming API. 

Explore advanced techniques such as
stream aggregation and window analysis to process real-time data efficiently. This course equips you with the skills to create scalable
and fault-tolerant streaming applications for dynamic data environments.

---

###Prerequisites: 
You should meet the following prerequisites before starting this course:

- Basic programming knowledge
- Familiarity with Python
- Basic understanding of SQL queries (`SELECT`, `JOIN`, `GROUP BY`)
- Familiarity with data processing concepts
- Developing Application with Spark or Prior Databricks Experience is required

---



In [0]:
current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
current_schema = spark.sql("SELECT current_schema()").collect()[0][0]
current_username = spark.sql("SELECT current_user()").collect()[0][0]

## Default values

In order to use Structured Streaming, you will need to store checkpoints.

In [0]:
checkpoint_location_prefix= '/Volumes/workspace/default/checkpoint'

In [0]:
%sql
DROP VOLUME IF EXISTS checkpoint;
CREATE VOLUME IF NOT EXISTS checkpoint;

# Introduction to Spark Structured Streaming

This notebook demonstrates key concepts of Structured Streaming using practical examples with IoT sensor data.

### Objectives
- Understand stream processing fundamentals
- Work with different streaming sources and sinks
- Implement streaming transformations
- Use watermarking and windowing
- Monitor streaming queries

## Stream Processing Setup

First, let's set up our streaming infrastructure and define our schema.

In [0]:
# Import necessary libraries if not already imported
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define the schema for the stream
schema = StructType([
    StructField("customer_id", LongType(), True),
    StructField("notifications", StringType(), True),
    StructField("order_id", LongType(), True),
    StructField("order_timestamp", LongType(), True)
])

# Now use this schema for your streaming DataFrame
stream_df = (spark.readStream
    .format("json")
    .schema(schema)
    .option("maxFilesPerTrigger", 1)
    .option("path", "/Volumes/databricks_simulated_retail_customer_data/v01/retail-pipeline/orders/stream_json")
    .load()
)

In [0]:
# Confirm we have set up a streaming dataframe
print(f"isStreaming: {stream_df.isStreaming}")

## Running Basic Streaming Queries

Now let's kick off some streaming queries.

But first you need to create a volume to host the check point files needed to support the streaming api.

In [0]:
# Display the stream for testing, display will start an implicit query (this is analogous to the console sink)
display(stream_df, checkpointLocation = f'{checkpoint_location_prefix}/stream_df')

## Basic Transformations on the Stream
Stateless streaming transformations are analogous to narrow transformations we would perform on normal DataFrames (`select`, `filter`, `withColumn`, etc).

In [0]:
# Simple transformations using standard DataFrame operations
transformed_stream = stream_df \
    .withColumn("notification_status", col("notifications").isNotNull()) \
    .withColumn("order_details", concat(lit("Order #"), col("order_id").cast("string")))

# Display the transformed stream
display(transformed_stream, checkpointLocation = '/Volumes/workspace/default/checkpoint/transformed_stream')

In [0]:
# Filter for only orders with notifications enabled
notifications_stream = stream_df \
    .filter(col("notifications") == "Y")

# Display filtered stream
display(notifications_stream, checkpointLocation='/Volumes/workspace/default/checkpoint/notifications_stream', outputMode='append')

In [0]:
# Stop any existing queries with the same name
for q in spark.streams.active:
    if q.name == "orders_streaming_table":
        q.stop()

# Write to memory sink for interactive querying
memory_query = stream_df.writeStream \
    .format("memory") \
    .option("checkpointLocation", "/Volumes/workspace/default/checkpoint/memory_query") \
    .queryName("orders_streaming_table") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .start()

In [0]:
%sql
-- Now you can query the in-memory table using SQL
SELECT notifications, count(*) as num_notifications 
FROM orders_streaming_table GROUP BY notifications

-- try running the query multiple time to see the count increasing

## Combining Multiple Streams using Union

Different stream sources can be combined using relational operators like `union`.  Let's have a look.

> **NOTE:** Stream-to-static DataFrame joins are fully supported and easy to implement (for example joining a stream with reference or static lookup data for enrichment). Stream-to-stream joins are supported as well, but require special handling for state management.

In [0]:
# Create a second stream with a subset of the data
filtered_stream1 = stream_df.filter(col("notifications") == "Y")
filtered_stream2 = stream_df.filter(col("notifications") == "N")

# Union the streams
combined_stream = filtered_stream1.union(filtered_stream2)

# Process the combined stream
display(combined_stream, checkpointLocation = '/Volumes/workspace/default/checkpoint/combined_stream')

## Using Triggers to Control Processing

Triggers control how long a batch window is, let's show an example.


In [0]:
# Stop any existing queries with the same name
for q in spark.streams.active:
    if q.name == "triggered_query_table":
        q.stop()

# Process data in micro-batches every 10 seconds
triggered_query = stream_df \
    .withColumn("processing_ts", current_timestamp()) \
    .writeStream \
    .format("memory") \
    .queryName("triggered_query_table") \
    .option("checkpointLocation", "/Volumes/workspace/default/checkpoint/triggered_query") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .start()

In [0]:
%sql
SELECT processing_ts, count(*) as count 
FROM triggered_query_table 
GROUP BY processing_ts
ORDER BY processing_ts

-- run the query multiple times

## Key Takeaways

1. **Stream Processing Fundamentals**:
   - Continuous data processing
   - Schema definition
   - Basic transformations

2. **Sources and Sinks**:
   - Rate source for testing
   - Console sink for debugging
   - Memory sink for monitoring

4. **Monitoring and Management**:
   - Query monitoring
   - Progress tracking
   - Resource management


Run the cell below to stop the active streaming queries.

In [0]:
for query in spark.streams.active:
    query.stop()

# Lab - Introduction to Spark Structured Streaming

In this lab, you'll work with a streaming dataset containing order status updates. You'll learn how to create streaming DataFrames, perform basic transformations, and work with different streaming sinks.

### Objectives
- Understand stream processing fundamentals
- Implement basic streaming operations
- Work with different streaming sources and sinks
- Apply streaming transformations and watermarking
- Handle late data and monitor streaming queries

## Stream Processing Setup

First, let's set up our streaming infrastructure and define our data schema.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# --# 1. Create a schema for the status updates with these fields:
# --#  - order_id (LongType)
# --# - order_status (StringType)
# --# - status_timestamp (LongType)
# --# 2. Create a streaming DataFrame that reads JSON files from the path:
# --#   /Volumes/databricks_simulated_retail_customer_data/v01/retail-pipeline/status/stream_json
# --# 3. Set maxFilesPerTrigger to 1
# --# 4. Verify that you have created a streaming DataFrame

## Streaming Queries

Now we will create a basic streaming query, using the memory sink which we will subsequently query using SQL.

In [0]:
# Write the results of your status_stream into a memory sink with a query name of "order_status_streaming_table", appending records to the output sink

# Stop any existing queries with the same name
for q in spark.streams.active:
    if q.name == "order_status_streaming_table":
        q.stop()

In [0]:
# find the number of the different types of order_status values in the stream

## Basic Transformations

Now, you'll perform some basic transformations on the streaming data.

In [0]:
# --# 1. Create a new streaming DataFrame that:
# --#   - Converts the status_timestamp to a timestamp type column named "event_time"
# --#   - Creates a new column "status_description" that adds a descriptive prefix to the status value
# --#   - Creates a new column "is_completed" that is TRUE when order_status is "delivered" or "canceled", and FALSE otherwise
# --# 2. Display the transformed stream

## Controlling Processing with Triggers

Finally, you'll use triggers to control how the stream processes data.

In [0]:
# --# 1. Create a triggered streaming query that:
# --#   - Processes data from the status_stream every 15 seconds
# --#   - Adds a processing_time timestamp column
# --#   - Writes to a memory sink named "triggered_status_updates"
# --# 2. Run a SQL query to see the processing batches

# --# Stop any existing queries with the same name
for q in spark.streams.active:
    if q.name == "triggered_status_updates":
        q.stop()

In [0]:
# Check the processing batches in SQL

## Key Takeaways

1. **Stream Processing Fundamentals**
   - Structured Streaming provides a DataFrame-based streaming API
   - Supports both batch and streaming processing models
   - Handles data consistency and fault tolerance

2. **Sources and Sinks**
   - Multiple input sources available (Rate, File, Kafka, etc.)
   - Various output sinks for different use cases
   - Memory sink useful for testing and debugging

3. **Data Processing**
   - Supports standard DataFrame operations
   - Windowing and watermarking for time-based processing
   - Aggregations and streaming joins

4. **Monitoring and Management**
   - Built-in query monitoring capabilities
   - Progress tracking and metrics
   - Late data handling strategies


Run the cell below to stop the active streaming queries.

In [0]:
%python
for query in spark.streams.active:
    query.stop()

# Window Aggregation in Spark Structured Streaming

This notebook demonstrates advanced concepts of Structured Streaming including stateful operations, state management, streaming joins, and window operations.

### Objectives
- Understand stateful vs stateless operations
- Implement windowed operations
- Perform streaming joins
- Work with late arriving data using watermarks

## Setup and Data Sources

First, let's create two streaming DataFrames that we'll use throughout our demo:
1. An **orders stream** containing customer orders
2. A **status stream** containing order status updates


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Define schema for orders
orders_schema = StructType([
    StructField("customer_id", LongType(), True),
    StructField("notifications", StringType(), True),
    StructField("order_id", LongType(), True),
    StructField("order_timestamp", LongType(), True)
])

# Define schema for status updates
status_schema = StructType([
    StructField("order_id", LongType(), True),
    StructField("order_status", StringType(), True),
    StructField("status_timestamp", LongType(), True)
])

# Create orders streaming DataFrame
orders_stream = spark.readStream \
    .format("json") \
    .schema(orders_schema) \
    .option("maxFilesPerTrigger", 1) \
    .option("path", "/Volumes/databricks_simulated_retail_customer_data/v01/retail-pipeline/orders/stream_json") \
    .load()

# Create status streaming DataFrame
status_stream = spark.readStream \
    .format("json") \
    .schema(status_schema) \
    .option("maxFilesPerTrigger", 1) \
    .option("path", "/Volumes/databricks_simulated_retail_customer_data/v01/retail-pipeline/status/stream_json") \
    .load()

# Verify both are streaming DataFrames
print(f"orders_stream is streaming: {orders_stream.isStreaming}")
print(f"status_stream is streaming: {status_stream.isStreaming}")

## Stateless vs Stateful Operations

Let's look at the difference between stateless and stateful operations:

- **Stateless operations**: Process each record independently (e.g., `select`, `filter`)
- **Stateful operations**: Maintain information across batches (e.g., `groupBy`, `join`)

### Stateless Operation Example
Let's apply some simple stateless transformations to our streams:


In [0]:
display(orders_stream, checkpointLocation = f'{checkpoint_location_prefix}/orders_stream')

In [0]:
display(status_stream, checkpointLocation = f'{checkpoint_location_prefix}/status_stream')

In [0]:
# Convert timestamps to a more usable format (stateless operation)
orders_transformed = orders_stream \
    .withColumn("order_time", from_unixtime(col("order_timestamp")).cast("timestamp")) \
    .withColumn("notification_enabled", col("notifications") == "Y")

# Display the transformed stream
display(orders_transformed, checkpointLocation = f'{checkpoint_location_prefix}/orders_transformed')

### Stateful Operation Example
Now let's perform some stateful operations that maintain state across batches:

In [0]:
# Count orders by status (stateful aggregation)
status_counts = status_stream \
    .groupBy("order_status") \
    .count() \
    .orderBy(col("count").desc())

display(status_counts, checkpointLocation = f'{checkpoint_location_prefix}/status_counts')

## Window Operations

Window operations allow us to perform aggregations over time windows. We'll demonstrate:
- Tumbling Windows (fixed, non-overlapping)
- Sliding Windows (overlapping windows)

### Tumbling Window Example

Let's count orders per 1-minute tumbling window:

In [0]:
# First, make sure to clean up previous streams with same name
for query in spark.streams.active:
    if query.name == "tumbling_window_counts":
        query.stop()

# Prepare data by ensuring we have a proper timestamp column
status_events = status_stream \
    .withColumn("event_time", from_unixtime(col("status_timestamp")).cast("timestamp"))

# Group by status and 1-minute tumbling windows
tumbling_windows = status_events \
    .groupBy(
        window(col("event_time"), "1 minute"),
        col("order_status")
    ) \
    .count()

# Write to memory for visualization
tumbling_window_query = (tumbling_windows.writeStream
    .format("memory")
    .outputMode("complete")
    .trigger(availableNow=True)
    .option("checkpointLocation", f'{checkpoint_location_prefix}/tumbling_window_query')
    .queryName("tumbling_window_counts")
    .start()
)

In [0]:
%sql
-- Query the tumbling window results
SELECT 
  window.start as window_start,
  window.end as window_end,
  order_status,
  count
FROM tumbling_window_counts
ORDER BY window_start, order_status

### Sliding Window Example
Now let's count orders per 2-minute window, sliding every 1 minute:


In [0]:
# Stop any existing queries with the same name
for query in spark.streams.active:
    if query.name == "sliding_window_counts":
        query.stop()

# Group by status and sliding window
sliding_windows = status_events \
    .groupBy(
        window(col("event_time"), "2 minutes", "1 minute"),
        col("order_status")
    ) \
    .count()

# Write to memory for visualization
sliding_window_query = (sliding_windows.writeStream
    .format("memory")
    .outputMode("complete")
    .trigger(availableNow=True)
    .option("checkpointLocation", f'{checkpoint_location_prefix}/sliding_window_query') 
    .queryName("sliding_window_counts")
    .start()
)

In [0]:
%sql
-- Query the sliding window results
SELECT 
  window.start as window_start,
  window.end as window_end,
  order_status,
  count
FROM sliding_window_counts
ORDER BY window_start, order_status

## Streaming Joins

Let's demonstrate joining our streaming order data with status updates.

In [0]:
# Prepare our streaming DataFrames with proper timestamps
orders_with_time = orders_stream \
    .withColumn("order_time", from_unixtime(col("order_timestamp")).cast("timestamp"))

status_with_time = status_stream \
    .withColumn("status_time", from_unixtime(col("status_timestamp")).cast("timestamp"))

### Stream-Static Join
First, let's create a static DataFrame for lookup purposes.


In [0]:
# Create a static lookup table for order status descriptions
status_lookup = spark.createDataFrame([
    ("placed", "Order has been placed"),
    ("preparing", "Order is being prepared"),
    ("on the way", "Order is in transit"),
    ("delivered", "Order has been delivered"),
    ("cancelled", "Order has been cancelled")
], ["order_status", "status_description"])

In [0]:
# Join streaming status data with static status descriptions
enriched_status = status_with_time \
    .join(status_lookup, "order_status")

# Display the joined stream
display(enriched_status, checkpointLocation = f'{checkpoint_location_prefix}/enriched_status')

### Stream-Stream Join
Now let's join our two streaming DataFrames.

In [0]:
# Stop any existing queries
for query in spark.streams.active:
    if query.name == "order_status_join":
        query.stop()

# Join order stream with status stream on order_id
# Note: We need to limit state buildup for production use
order_status_join = orders_with_time \
    .join(
        status_with_time,
        "order_id"
    )

# Write to memory sink
order_status_join_query = (order_status_join.writeStream
    .format("memory")
    .outputMode("append")
    .trigger(availableNow=True)
    .option("checkpointLocation", f'{checkpoint_location_prefix}/order_status_join')
    .queryName("order_status_join")
    .start()
)

In [0]:
%sql
-- Query the joined data
SELECT 
  order_id, 
  customer_id, 
  order_status,
  notifications,
  order_time,
  status_time
FROM order_status_join
LIMIT 20


## Handling Late Data with Watermarks
Watermarks help us handle late-arriving data by defining how long to wait for late events.


In [0]:
# Stop any existing queries
for query in spark.streams.active:
    if query.name == "windowed_with_watermark":
        query.stop()

# Add watermark to status events
status_with_watermark = status_events \
    .withWatermark("event_time", "10 minutes")

# Windows with watermark
watermarked_windows = status_with_watermark \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("order_status")
    ) \
    .count()

# Write to memory
query5 = (watermarked_windows.writeStream
    .format("memory")
    .outputMode("complete")
    .trigger(availableNow=True)
    .option("checkpointLocation", f'{checkpoint_location_prefix}/watermarked_windows')
    .queryName("windowed_with_watermark")
    .start()
)

In [0]:
%sql
-- Query the windowed data with watermark
SELECT 
  window.start as window_start,
  window.end as window_end,
  order_status,
  count
FROM windowed_with_watermark
ORDER BY window_start, order_status

Run the cell below to stop the active streaming queries.

In [0]:
for query in spark.streams.active:
    query.stop()

# Lab - Window Aggregation in Spark Structured Streaming

In this lab, you'll work with stateful operations, sliding windows, and watermarks in Spark Structured Streaming. You'll analyze streams of order and status data to derive meaningful insights.

### Objectives
- Implement stateful aggregations and window operations
- Handle late data and state management
- Build real-time monitoring systems

## Setup and Data Sources

First, let's set up our streaming environment with the necessary data sources.

In [0]:
%python
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Schemas are provided for you
orders_schema = StructType([
    StructField("customer_id", LongType(), True),
    StructField("notifications", StringType(), True),
    StructField("order_id", LongType(), True),
    StructField("order_timestamp", LongType(), True)
])

status_schema = StructType([
    StructField("order_id", LongType(), True),
    StructField("order_status", StringType(), True),
    StructField("status_timestamp", LongType(), True)
])

# Create status streaming DataFrame
status_stream = spark.readStream \
    .format("json") \
    .schema(status_schema) \
    .option("maxFilesPerTrigger", 1) \
    .option("path", "/Volumes/databricks_simulated_retail_customer_data/v01/retail-pipeline/status/stream_json") \
    .load()

# Create orders streaming DataFrame
orders_stream = spark.readStream \
    .format("json") \
    .schema(orders_schema) \
    .option("maxFilesPerTrigger", 1) \
    .option("path", "/Volumes/databricks_simulated_retail_customer_data/v01/retail-pipeline/orders/stream_json") \
    .load()

# Add event_time column to status stream
status_events = status_stream \
    .withColumn("event_time", from_unixtime(col("status_timestamp")).cast("timestamp"))

# Verify streams are set up correctly
print(f"orders_stream is streaming: {orders_stream.isStreaming}")
print(f"status_stream is streaming: {status_stream.isStreaming}")

## Stateful Operations

Let's explore stateful operations that maintain state across micro-batches.

In [0]:
--<FILL IN>
--# 1. Create a stateful aggregation that counts the number of orders by `order_status`
--# 2. Create another stateful aggregation that counts orders by `customer_id`
--# 3. Start streaming queries for both aggregations with `complete` output mode, writing to memory tables called "status_counts" and "customer_counts"

Now you can query these tables to see the results:

In [0]:
-- Query the in-memory table to see status counts
SELECT * FROM status_counts

In [0]:
-- Query the in-memory table to see customer counts
select * from customer_counts

## Sliding Window Operations
In this section, you'll implement sliding window aggregations on the streaming data.


In [0]:
--<FILL IN>
--# 1. Create a sliding window aggregation on the status stream that:
--#   - Groups by `order_status`
--#   - Uses a window duration of 3 minutes
--#   - Uses a sliding interval of 1 minute
--#   - Counts the number of events in each window
--# 2. Start a streaming query with this aggregation, using the `complete` output mode, writing to a memory table called "sliding_windows"

You can query the sliding window results:

In [0]:
-- Query the sliding window results
SELECT 
  window.start as window_start,
  window.end as window_end,
  order_status,
  count
FROM sliding_windows
ORDER BY window_start, order_status

## Late Data Handling with Watermarks
Now, let's explore how to handle late-arriving data using watermarks.

In [0]:
--<FILL IN>
--# 1. Modify your sliding window implementation to include a watermark of 5 minutes
--# 2. Write the results to a memory table called "windowed_with_watermark"
--# 3. Create another query that demonstrates a streaming join between orders and status with watermarks

Query the results:

In [0]:
--# Query the windowed data with watermark
<FILL-IN>

In [0]:
--# TODO Query the joined data with watermarks
<FILL-IN>

Run the cell below to stop the active streaming queries.

In [0]:
%python
for query in spark.streams.active:
    query.stop()