# Spark Structured Streaming - Complete Tutorial

## 📚 Learning Path: Basic to Advanced

This comprehensive tutorial covers Spark Structured Streaming from fundamental concepts to advanced production patterns:

### 🎯 **Tutorial Outline**

#### **📖 Part I: Fundamentals (Basic)**
1. **Environment Setup** - Spark configuration and initialization
2. **Data Sources & Schema** - Input sources and data modeling
3. **ReadStream Operations** - Reading from various sources
4. **Basic Transformations** - Filtering, selecting, and simple operations

#### **⚙️ Part II: Core Concepts (Intermediate)**
5. **Triggers & Processing** - Controlling micro-batch execution
6. **Output Modes** - Append, Complete, Update strategies
7. **WriteStream Operations** - Various output sinks
8. **Basic Aggregations** - Count, sum, average operations

#### **🔥 Part III: Advanced Concepts (Advanced)**
9. **State Management** - Stateful vs Stateless operations
10. **Window Operations** - Tumbling, Sliding, Session windows
11. **Watermarks** - Late data handling and state cleanup
12. **Streaming Joins** - Stream-to-stream and stream-to-static joins
13. **Advanced Aggregations** - Approximate functions and complex analytics
14. **RocksDB State Store** - Production state management

#### **🏭 Part IV: Production Patterns (Expert)**
15. **Kafka Integration** - Real-world streaming with Kafka
16. **Monitoring & Debugging** - Query monitoring and troubleshooting
17. **Performance Optimization** - Best practices and tuning
18. **Complete Pipeline** - End-to-end production example

---

## 🎓 Prerequisites
- Basic knowledge of Apache Spark and PySpark
- Understanding of SQL and DataFrames
- Familiarity with streaming concepts

## 🛠️ What You'll Build
By the end of this tutorial, you'll have built:
- Real-time analytics dashboard
- Customer behavior tracking system
- Fraud detection pipeline
- Complete Kafka streaming application

Let's begin! 🚀

# 📖 Part I: Fundamentals

## 1. Environment Setup and Initialization

First, let's set up our Spark environment with the necessary configurations for streaming.

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time
import json
import os
import threading
import random
from datetime import datetime, timedelta

# Create Spark Session with streaming configurations
# spark = SparkSession.builder \
#     .appName("StructuredStreamingCompleteTutorial") \
#     .config("spark.sql.streaming.checkpointLocation", "/tmp/streaming_checkpoints") \
#     .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
#     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
#     .config("spark.sql.streaming.stateStore.providerClass", 
#             "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
#     .config("spark.sql.adaptive.enabled", "true") \
#     .getOrCreate()

# Set log level to reduce noise
#spark.sparkContext.setLogLevel("WARN")
data_dir = "/Volumes/workspace/default/stream/spark_streaming_workshop"

print("🚀 Spark Structured Streaming Environment Ready!")
print(f"Spark Version: {spark.version}")
#print(f"Session ID: {spark.sparkContext.applicationId}")
print("✅ Configured with RocksDB state store for production workloads")

🚀 Spark Structured Streaming Environment Ready!
Spark Version: 4.0.0
✅ Configured with RocksDB state store for production workloads


## 2. Data Sources and Schema Definition

Understanding data schemas and sources is fundamental to streaming applications.

In [0]:
# Sample data generator for streaming
import random
from datetime import datetime, timedelta
import builtins  # <-- Add this import to use Python's built-in round
# Define comprehensive schema for our streaming data
streaming_schema = StructType([
    StructField("customer_id", LongType(), True),
    StructField("order_id", LongType(), True),
    StructField("product_name", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("order_timestamp", TimestampType(), True),
    StructField("category", StringType(), True),
    StructField("customer_location", StringType(), True),
    StructField("payment_method", StringType(), True)
])

print("📊 Streaming Schema Defined:")
for field in streaming_schema.fields:
    print(f"  • {field.name}: {field.dataType} (nullable: {field.nullable})")

# Sample data generator for realistic streaming simulation
def generate_sample_data(num_records=10):
    """Generate realistic e-commerce sample data"""
    
    products = [
        {"name": "iPhone 15", "category": "Electronics", "base_price": 999.99},
        {"name": "MacBook Pro", "category": "Electronics", "base_price": 1999.99},
        {"name": "Nike Air Max", "category": "Fashion", "base_price": 129.99},
        {"name": "Coffee Maker", "category": "Home", "base_price": 89.99},
        {"name": "Gaming Chair", "category": "Furniture", "base_price": 299.99},
        {"name": "Wireless Headphones", "category": "Electronics", "base_price": 199.99},
        {"name": "Yoga Mat", "category": "Sports", "base_price": 49.99},
        {"name": "Smart Watch", "category": "Electronics", "base_price": 399.99}
    ]
    
    locations = ["New York", "California", "Texas", "Florida", "Illinois"]
    payment_methods = ["credit_card", "debit_card", "paypal", "apple_pay"]
    
    sample_data = []
    base_time = datetime.now()
    
    for i in range(num_records):
        product = random.choice(products)
        customer_id = random.randint(1001, 9999)
        order_id = random.randint(100000, 999999)
        quantity = random.randint(1, 5)
        price_variation = random.uniform(0.8, 1.2)
        price = builtins.round(product["base_price"] * price_variation, 2)
        time_offset = random.randint(0, 3600)  # Last hour
        order_timestamp = base_time - timedelta(seconds=time_offset)
        
        record = {
            "customer_id": customer_id,
            "order_id": order_id,
            "product_name": product["name"],
            "quantity": quantity,
            "price": price,
            "order_timestamp": order_timestamp.isoformat(),
            "category": product["category"],
            "customer_location": random.choice(locations),
            "payment_method": random.choice(payment_methods)
        }
        sample_data.append(record)
    
    return sample_data

# Generate and display sample data
sample_records = generate_sample_data(5)
print("\n📋 Sample Data Generated:")
for i, record in enumerate(sample_records, 1):
    print(f"\n🔸 Record {i}:")
    print(json.dumps(record, indent=2, default=str))

print(f"\n✅ Generated {len(sample_records)} sample records")

📊 Streaming Schema Defined:
  • customer_id: LongType() (nullable: True)
  • order_id: LongType() (nullable: True)
  • product_name: StringType() (nullable: True)
  • quantity: IntegerType() (nullable: True)
  • price: DoubleType() (nullable: True)
  • order_timestamp: TimestampType() (nullable: True)
  • category: StringType() (nullable: True)
  • customer_location: StringType() (nullable: True)
  • payment_method: StringType() (nullable: True)

📋 Sample Data Generated:

🔸 Record 1:
{
  "customer_id": 8083,
  "order_id": 186772,
  "product_name": "MacBook Pro",
  "quantity": 3,
  "price": 2101.79,
  "order_timestamp": "2025-09-14T16:12:21.685199",
  "category": "Electronics",
  "customer_location": "California",
  "payment_method": "paypal"
}

🔸 Record 2:
{
  "customer_id": 2417,
  "order_id": 324723,
  "product_name": "Wireless Headphones",
  "quantity": 5,
  "price": 231.04,
  "order_timestamp": "2025-09-14T16:06:15.685199",
  "category": "Electronics",
  "customer_location": "Texas

## 3. ReadStream Operations - Data Sources

Structured Streaming supports various input sources. Let's explore the most common ones.

In [0]:
# Create sample files for file-based streaming
def create_sample_files(output_dir="/tmp/streaming_input", num_files=3):
    """Create sample JSON files for file-based streaming"""
    os.makedirs(output_dir, exist_ok=True)
    
    for file_num in range(1, num_files + 1):
        file_data = generate_sample_data(5)
        filename = f"orders_batch_{file_num:03d}.json"
        filepath = os.path.join(output_dir, filename)
        
        with open(filepath, 'w') as f:
            for record in file_data:
                f.write(json.dumps(record, default=str) + '\n')
    
    return output_dir

# Create sample files
output_dir = f"{data_dir}/json_input"
file_input_path = create_sample_files(output_dir)
print(f"📁 Sample files created in: {file_input_path}")


📁 Sample files created in: /Volumes/workspace/default/stream/spark_streaming_workshop/json_input


## 4. Basic Transformations

Learn fundamental streaming transformations that don't require state management.

In [0]:
# File stream configuration
file_stream = spark.readStream \
    .format("json") \
    .schema(streaming_schema) \
    .option("path", file_input_path) \
    .option("maxFilesPerTrigger", 1) \
    .load()

print("✅ File stream configured")

print("\n📊 Available Streaming Sources:")
print("  • rate_stream - Basic rate source")
print("  • enhanced_rate_stream - Realistic e-commerce data")
print("  • file_stream - JSON file monitoring")

✅ File stream configured

📊 Available Streaming Sources:
  • rate_stream - Basic rate source
  • enhanced_rate_stream - Realistic e-commerce data
  • file_stream - JSON file monitoring


In [0]:
file_stream.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- customer_location: string (nullable = true)
 |-- payment_method: string (nullable = true)



In [0]:
# 4.1 Stateless Transformations (No state between batches)
print("🔄 Basic Streaming Transformations")
print("\n📝 Stateless Operations (Process each micro-batch independently):")

# Filtering and selection
filtered_stream = file_stream \
    .filter(col("price") > 10) \
    .select("customer_id", "product_name", "price", "category", "order_timestamp")

print("✅ Filtering: Orders > $10")
display(filtered_stream.limit(5), 
        checkpointLocation=f"{data_dir}/filtered_stream_checkpoint")

customer_id,product_name,price,category,order_timestamp
6349,Yoga Mat,46.31,Sports,2025-09-14T16:43:49.516Z
7198,Coffee Maker,76.4,Home,2025-09-14T16:17:52.516Z
8588,Wireless Headphones,208.24,Electronics,2025-09-14T16:58:33.516Z
2851,Coffee Maker,87.45,Home,2025-09-14T16:12:32.516Z
3385,Gaming Chair,288.07,Furniture,2025-09-14T16:48:16.516Z


In [0]:
# Adding computed columns
enriched_stream = file_stream \
    .withColumn("total_amount", col("price") * col("quantity")) \
    .withColumn("order_value_tier",
        when(col("price") > 500, "Premium")
        .when(col("price") > 200, "Standard")
        .otherwise("Basic")
    ) \
    .withColumn("is_high_quantity", col("quantity") > 2) \
    .withColumn("processing_time", current_timestamp())

print("✅ Column enrichment: Added total_amount, order_value_tier, processing_time")
display(enriched_stream.limit(5),checkpointLocation=f"{data_dir}/enriched_stream_checkpoint")

customer_id,order_id,product_name,quantity,price,order_timestamp,category,customer_location,payment_method,total_amount,order_value_tier,is_high_quantity,processing_time
6349,547863,Yoga Mat,4,46.31,2025-09-14T16:43:49.516Z,Sports,New York,apple_pay,185.24,Basic,True,2025-09-14T17:13:02.830Z
7198,370693,Coffee Maker,3,76.4,2025-09-14T16:17:52.516Z,Home,Illinois,credit_card,229.2,Basic,True,2025-09-14T17:13:02.830Z
8588,604014,Wireless Headphones,4,208.24,2025-09-14T16:58:33.516Z,Electronics,California,credit_card,832.96,Standard,True,2025-09-14T17:13:02.830Z
2851,991716,Coffee Maker,2,87.45,2025-09-14T16:12:32.516Z,Home,California,credit_card,174.9,Basic,False,2025-09-14T17:13:02.830Z
3385,927432,Gaming Chair,3,288.07,2025-09-14T16:48:16.516Z,Furniture,New York,apple_pay,864.21,Standard,True,2025-09-14T17:13:02.830Z


In [0]:
# String transformations
text_transformed_stream = file_stream \
    .withColumn("product_upper", upper(col("product_name"))) \
    .withColumn("category_lower", lower(col("category"))) \
    .withColumn("location_formatted", concat(lit("City: "), col("customer_location")))

print("✅ Text transformations: Upper/lower case, concatenation")
display(text_transformed_stream.limit(5),checkpointLocation=f"{data_dir}/text_transformed_checkpoint")

customer_id,order_id,product_name,quantity,price,order_timestamp,category,customer_location,payment_method,product_upper,category_lower,location_formatted
6349,547863,Yoga Mat,4,46.31,2025-09-14T16:43:49.516Z,Sports,New York,apple_pay,YOGA MAT,sports,City: New York
7198,370693,Coffee Maker,3,76.4,2025-09-14T16:17:52.516Z,Home,Illinois,credit_card,COFFEE MAKER,home,City: Illinois
8588,604014,Wireless Headphones,4,208.24,2025-09-14T16:58:33.516Z,Electronics,California,credit_card,WIRELESS HEADPHONES,electronics,City: California
2851,991716,Coffee Maker,2,87.45,2025-09-14T16:12:32.516Z,Home,California,credit_card,COFFEE MAKER,home,City: California
3385,927432,Gaming Chair,3,288.07,2025-09-14T16:48:16.516Z,Furniture,New York,apple_pay,GAMING CHAIR,furniture,City: New York


In [0]:
# Date/time transformations
time_transformed_stream = file_stream \
    .withColumn("hour_of_day", hour(col("order_timestamp"))) \
    .withColumn("day_of_week", dayofweek(col("order_timestamp"))) \
    .withColumn("is_weekend", dayofweek(col("order_timestamp")).isin([1, 7])) \
    .withColumn("order_date", to_date(col("order_timestamp")))

print("✅ Time transformations: Hour, day of week, weekend detection")
display(time_transformed_stream.limit(5), checkpointLocation=f"{data_dir}/time_transformed_checkpoint" )


customer_id,order_id,product_name,quantity,price,order_timestamp,category,customer_location,payment_method,hour_of_day,day_of_week,is_weekend,order_date
6349,547863,Yoga Mat,4,46.31,2025-09-14T16:43:49.516Z,Sports,New York,apple_pay,16,1,True,2025-09-14
7198,370693,Coffee Maker,3,76.4,2025-09-14T16:17:52.516Z,Home,Illinois,credit_card,16,1,True,2025-09-14
8588,604014,Wireless Headphones,4,208.24,2025-09-14T16:58:33.516Z,Electronics,California,credit_card,16,1,True,2025-09-14
2851,991716,Coffee Maker,2,87.45,2025-09-14T16:12:32.516Z,Home,California,credit_card,16,1,True,2025-09-14
3385,927432,Gaming Chair,3,288.07,2025-09-14T16:48:16.516Z,Furniture,New York,apple_pay,16,1,True,2025-09-14


In [0]:
# Complex conditional logic
business_logic_stream = file_stream \
    .withColumn("customer_segment",
        when((col("price") > 500) & (col("payment_method") == "credit_card"), "Premium")
        .when((col("quantity") > 3) | (col("category") == "Electronics"), "High Value")
        .when(col("customer_location").isin(["New York", "California"]), "Metro")
        .otherwise("Standard")
    ) \
    .withColumn("discount_eligible",
        (col("quantity") >= 3) & (col("price") > 100)
    )

print("✅ Business logic: Customer segmentation, discount eligibility")
display(business_logic_stream.limit(5),checkpointLocation=f"{data_dir}/business_logic_checkpoint")


customer_id,order_id,product_name,quantity,price,order_timestamp,category,customer_location,payment_method,customer_segment,discount_eligible
6349,547863,Yoga Mat,4,46.31,2025-09-14T16:43:49.516Z,Sports,New York,apple_pay,High Value,False
7198,370693,Coffee Maker,3,76.4,2025-09-14T16:17:52.516Z,Home,Illinois,credit_card,Standard,False
8588,604014,Wireless Headphones,4,208.24,2025-09-14T16:58:33.516Z,Electronics,California,credit_card,High Value,True
2851,991716,Coffee Maker,2,87.45,2025-09-14T16:12:32.516Z,Home,California,credit_card,Metro,False
3385,927432,Gaming Chair,3,288.07,2025-09-14T16:48:16.516Z,Furniture,New York,apple_pay,Metro,True


# ⚙️ Part II: Core Concepts

## 5. Triggers - Controlling Processing Timing

Triggers determine when micro-batches are processed in streaming applications.

# 5.1 Understanding Different Trigger Types

⏰ Trigger Types in Structured Streaming  
==================================================

**1️⃣ Default Trigger:**  
   • Processes micro-batches as soon as previous batch completes  
   • Highest throughput but variable timing  
   • Usage: `.trigger()` (no parameters)

**2️⃣ Fixed Interval Trigger:**  
   • Processes micro-batches at regular intervals  
   • Predictable timing, good for regular reporting  
   • Usage: `.trigger(processingTime='10 seconds')`

**3️⃣ One-time Trigger:**  
   • Processes all available data once and stops  
   • Good for batch-like processing of streaming data  
   • Usage: `.trigger(once=True)`

**4️⃣ Continuous Trigger (Experimental):**  
   • Ultra-low latency processing (~1ms)  
   • Limited operation support  
   • Usage: `.trigger(continuous='1 second')`

# 5.2 Practical Trigger Examples

🛠️ Practical Trigger Implementation:

In [0]:
# Stop any existing queries
for q in spark.streams.active:
    if q.name and "trigger_demo" in q.name:
        q.stop()
        print(f"🛑 Stopped existing query: {q.name}")

# Fixed interval trigger example
trigger_demo_stream = file_stream \
    .withColumn("processing_batch_time", current_timestamp()) \
    .select(
        "customer_id",
        "order_id",
        "product_name",
        "quantity",
        "price",
        "order_timestamp",
        "category",
        "customer_location",
        "payment_method",
        "processing_batch_time"
    )

# Start query with AvailableNow trigger
trigger_query = trigger_demo_stream \
    .writeStream \
    .format("memory") \
    .queryName("trigger_demo_table") \
    .outputMode("append") \
    .option("checkpointLocation", f"/{data_dir}/checkpoint_") \
    .trigger(availableNow=True) \
    .start()

print("✅ Started AvailableNow trigger demo")
print("   • Query Name: trigger_demo_table")
print("   • Processing: All available data and then stops")
print("   • Monitor: Check processing_batch_time to see batching")

# Let it run briefly to collect some data
time.sleep(5)

✅ Started AvailableNow trigger demo
   • Query Name: trigger_demo_table
   • Processing: All available data and then stops
   • Monitor: Check processing_batch_time to see batching


In [0]:
# Analyze trigger behavior
print("\n📊 Trigger Behavior Analysis:")
batch_analysis = spark.sql("""
SELECT processing_batch_time, 
       count(*) as records_in_batch,
       min(order_timestamp) as batch_start_time,
       max(order_timestamp) as batch_end_time
FROM trigger_demo_table 
GROUP BY processing_batch_time 
ORDER BY processing_batch_time DESC
""")

batch_analysis.show(truncate=False)

print("\n💡 Trigger Selection Guidelines:")
guidelines = {
    "Real-time dashboards": "Fixed interval (5-30 seconds)",
    "Financial reporting": "Fixed interval (1-5 minutes)", 
    "ETL pipelines": "Default trigger for throughput",
    "Batch migration": "One-time trigger",
    "Ultra-low latency": "Continuous trigger (experimental)"
}

for use_case, recommendation in guidelines.items():
    print(f"  • {use_case}: {recommendation}")

# Stop demo query
trigger_query.stop()
print("\n🛑 Stopped trigger demo query")


📊 Trigger Behavior Analysis:
+-----------------------+----------------+--------------------------+--------------------------+
|processing_batch_time  |records_in_batch|batch_start_time          |batch_end_time            |
+-----------------------+----------------+--------------------------+--------------------------+
|2025-09-14 17:13:50.052|5               |2025-09-14 16:04:26.822069|2025-09-14 16:47:23.822069|
|2025-09-14 17:13:49.318|5               |2025-09-14 16:13:42.619924|2025-09-14 16:51:57.619924|
|2025-09-14 17:13:48.621|5               |2025-09-14 16:12:32.516373|2025-09-14 16:58:33.516373|
+-----------------------+----------------+--------------------------+--------------------------+


💡 Trigger Selection Guidelines:
  • Real-time dashboards: Fixed interval (5-30 seconds)
  • Financial reporting: Fixed interval (1-5 minutes)
  • ETL pipelines: Default trigger for throughput
  • Batch migration: One-time trigger
  • Ultra-low latency: Continuous trigger (experimental)

🛑

## 6. Output Modes - Data Output Strategies

Output modes determine what data gets written to the output sink in each micro-batch.

# 6.1 Understanding Output Modes

📤 **Output Modes in Structured Streaming**
=============================================

- **Append**
  - *Description*: Only new rows added since last trigger
  - *Use Cases*: ETL, filtering, stateless transformations
  - *Aggregations*: Only with watermarks (finalized windows)
  - *Memory Usage*: Low
  - *Default For*: Non-aggregation queries

- **Complete**
  - *Description*: Entire result table in each micro-batch
  - *Use Cases*: Small aggregations, dashboards
  - *Aggregations*: All aggregations supported
  - *Memory Usage*: High (full table in memory)
  - *Default For*: Aggregations without watermarks

- **Update**
  - *Description*: Only rows that were updated since last trigger
  - *Use Cases*: Large aggregations with watermarks
  - *Aggregations*: Supported with watermarks
  - *Memory Usage*: Medium
  - *Default For*: Watermarked aggregations
  

In [0]:
# 6.2 Practical Output Mode Examples
print("\n\n🛠️ Practical Output Mode Examples:")

# Clean up existing demo queries
for q in spark.streams.active:
    if q.name and "output_mode_demo" in q.name:
        q.stop()

# Example 1: Append Mode (Stateless)
print("\n1️⃣ Append Mode Example:")
append_stream = file_stream \
    .filter(col("price") > 200) \
    .select("customer_id", "product_name", "price", "category", "order_timestamp")

append_query = append_stream \
    .writeStream \
    .format("memory") \
    .queryName("output_mode_demo_append") \
    .outputMode("append") \
    .option("checkpointLocation",f"{data_dir}/streaming_checkpoints/output_mode_demo_append") \
    .trigger(availableNow=True) \
    .start()

print("✅ Append mode: Only new filtered records each batch")

# Example 2: Complete Mode (Aggregations)
print("\n2️⃣ Complete Mode Example:")
complete_stream = file_stream \
    .groupBy("category") \
    .agg(
        count("*").alias("total_orders"),
        sum("price").alias("total_revenue"),
        avg("price").alias("avg_price")
    ) \
    .select(
        "category",
        "total_orders",
        round(col("total_revenue"), 2).alias("total_revenue"),
        round(col("avg_price"), 2).alias("avg_price")
    )

complete_query = complete_stream \
    .writeStream \
    .format("memory") \
    .queryName("output_mode_demo_complete") \
    .outputMode("complete") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/output_mode_demo_complete") \
    .trigger(availableNow=True) \
    .start()

print("✅ Complete mode: Entire aggregation result each batch")

# Example 3: Update Mode (Watermarked Aggregations)
print("\n3️⃣ Update Mode Example:")
update_stream = file_stream \
    .withWatermark("order_timestamp", "30 seconds") \
    .groupBy(
        window(col("order_timestamp"), "2 minutes"),
        col("category")
    ) \
    .agg(
        count("*").alias("window_orders"),
        sum("price").alias("window_revenue")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "category",
        "window_orders",
        round(col("window_revenue"), 2).alias("window_revenue")
    )

update_query = update_stream \
    .writeStream \
    .format("memory") \
    .queryName("output_mode_demo_update") \
    .outputMode("update") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/output_mode_demo_update") \
    .trigger(availableNow=True) \
    .start()

print("✅ Update mode: Only changed windows each batch")

# Let streams run to collect data
print("\n⏳ Collecting data for 30 seconds...")
time.sleep(30)



🛠️ Practical Output Mode Examples:

1️⃣ Append Mode Example:


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7269659990446062>, line 22[0m
[1;32m     10[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124m1️⃣ Append Mode Example:[39m[38;5;124m"[39m)
[1;32m     11[0m append_stream [38;5;241m=[39m file_stream \
[1;32m     12[0m     [38;5;241m.[39mfilter(col([38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m) [38;5;241m>[39m [38;5;241m200[39m) \
[1;32m     13[0m     [38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mcustomer_id[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mproduct_name[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mcategory[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124morder_timestamp[39m[38;5;124m"[39m)
[1;32m     15[0m append_query [38;5;241m=[39m append_strea

In [0]:
# Show results from different output modes
print("\n📊 Output Mode Results:")

print("\n🔸 Append Mode Results (filtered records):")
spark.sql("SELECT * FROM output_mode_demo_append ORDER BY order_timestamp DESC LIMIT 10").show()

print("\n🔸 Complete Mode Results (running totals):")
spark.sql("SELECT * FROM output_mode_demo_complete ORDER BY total_revenue DESC").show()

print("\n🔸 Update Mode Results (windowed aggregations):")
spark.sql("SELECT * FROM output_mode_demo_update ORDER BY window_start DESC LIMIT 10").show()

# Clean up
for query in [append_query, complete_query, update_query]:
    if query.isActive:
        query.stop()

print("\n🛑 Stopped all output mode demo queries")

print("\n💡 Output Mode Selection Guide:")
print("  • Append: ETL pipelines, data ingestion, stateless processing")
print("  • Complete: Real-time dashboards with small result sets")
print("  • Update: Large-scale aggregations with time windows")


📊 Output Mode Results:

🔸 Append Mode Results (filtered records):
+-----------+-------------------+-------+-----------+--------------------+
|customer_id|       product_name|  price|   category|     order_timestamp|
+-----------+-------------------+-------+-----------+--------------------+
|       3459|        MacBook Pro|2058.21|Electronics|2025-09-14 14:43:...|
|       8983|          iPhone 15|1079.94|Electronics|2025-09-14 14:39:...|
|       4472|       Gaming Chair| 270.43|  Furniture|2025-09-14 14:38:...|
|       2795|          iPhone 15| 896.54|Electronics|2025-09-14 14:36:...|
|       2277|        Smart Watch| 437.23|Electronics|2025-09-14 14:35:...|
|       7074|       Gaming Chair| 302.02|  Furniture|2025-09-14 14:21:...|
|       1119|          iPhone 15| 801.74|Electronics|2025-09-14 14:20:...|
|       1307|        MacBook Pro|1933.09|Electronics|2025-09-14 14:14:...|
|       6173|Wireless Headphones| 221.48|Electronics|2025-09-14 14:11:...|
|       8534|       Gaming Chair|

## 7. WriteStream Operations and Sinks

Understanding different output sinks and their configurations for various use cases.

In [0]:
# 7.1 Output Sink Types and Configurations
print("🎯 WriteStream Operations and Sinks")
print("=" * 40)

# Helper functions for different sinks
def start_console_sink(dataframe, query_name, output_mode="append", trigger_interval="10 seconds", num_rows=20):
    """Console sink for development and debugging"""
    return dataframe.writeStream \
        .outputMode(output_mode) \
        .format("console") \
        .option("truncate", False) \
        .option("numRows", num_rows) \
        .trigger(processingTime=trigger_interval) \
        .queryName(query_name) \
        .start()

def start_memory_sink(dataframe, table_name, output_mode="append", trigger_interval="10 seconds"):
    """Memory sink for testing and interactive analysis"""
    return dataframe.writeStream \
        .outputMode(output_mode) \
        .format("memory") \
        .queryName(table_name) \
        .trigger(processingTime=trigger_interval) \
        .start()

def start_file_sink(dataframe, output_path, file_format="parquet", output_mode="append", 
                   trigger_interval="30 seconds", query_name="file_sink"):
    """File sink for data lake storage"""
    return dataframe.writeStream \
        .outputMode(output_mode) \
        .format(file_format) \
        .option("path", output_path) \
        .option("checkpointLocation", f"/tmp/checkpoints/{query_name}") \
        .trigger(processingTime=trigger_interval) \
        .queryName(query_name) \
        .start()

print("🛠️ Sink helper functions created:")
print("  • start_console_sink() - For debugging and development")
print("  • start_memory_sink() - For testing and analysis")
print("  • start_file_sink() - For data lake storage")

# 7.2 Practical Sink Examples
print("\n📊 Practical Sink Implementations:")

# Clean up existing queries
for q in spark.streams.active:
    if q.name and "sink_demo" in q.name:
        q.stop()

# Prepare different data streams for sink demos
raw_data_stream = enhanced_rate_stream.select(
    "customer_id", "product_name", "price", "category", "timestamp"
)

aggregated_stream = enhanced_rate_stream \
    .groupBy("category", "customer_location") \
    .agg(
        count("*").alias("order_count"),
        sum("price").alias("total_revenue"),
        avg("price").alias("avg_order_value")
    ) \
    .select(
        "category", "customer_location", "order_count",
        round(col("total_revenue"), 2).alias("total_revenue"),
        round(col("avg_order_value"), 2).alias("avg_order_value")
    )

# Example 1: Console Sink (Development)
print("\n1️⃣ Console Sink - Live data preview:")
console_query = start_console_sink(
    raw_data_stream.filter(col("price") > 300),
    "sink_demo_console",
    output_mode="append",
    trigger_interval="8 seconds",
    num_rows=5
)
print("✅ Console sink started - High-value orders preview")

# Example 2: Memory Sink (Interactive Analysis)
print("\n2️⃣ Memory Sink - For SQL queries:")
memory_query = start_memory_sink(
    aggregated_stream,
    "sink_demo_memory",
    output_mode="complete",
    trigger_interval="12 seconds"
)
print("✅ Memory sink started - Category analytics table")

# Example 3: File Sink (Data Storage)
print("\n3️⃣ File Sink - Persistent storage:")
output_path = "/tmp/streaming_output/orders"
os.makedirs(output_path, exist_ok=True)

file_query = start_file_sink(
    raw_data_stream,
    output_path,
    file_format="json",
    output_mode="append",
    trigger_interval="20 seconds",
    query_name="sink_demo_file"
)
print(f"✅ File sink started - Saving to {output_path}")

# Let sinks run and collect data
print("\n⏳ Running sinks for 40 seconds to collect data...")
time.sleep(40)

# Query memory sink results
print("\n📋 Memory Sink Results:")
memory_results = spark.sql("""
SELECT category, customer_location, order_count, total_revenue, avg_order_value
FROM sink_demo_memory 
ORDER BY total_revenue DESC
""")
memory_results.show()

# Check file sink output
print("\n📁 File Sink Output:")
if os.path.exists(output_path) and os.listdir(output_path):
    files = [f for f in os.listdir(output_path) if f.endswith('.json')]
    print(f"Created {len(files)} JSON files in {output_path}")
    if files:
        sample_file = os.path.join(output_path, files[0])
        with open(sample_file, 'r') as f:
            sample_content = f.readline()
        print(f"Sample content: {sample_content[:100]}...")
else:
    print("No files created yet (may need more time)")

# Query monitoring
print("\n📊 Active Query Status:")
for query in [console_query, memory_query, file_query]:
    if query.isActive:
        progress = query.lastProgress
        print(f"  • {query.name}: Active")
        if progress:
            print(f"    Batch: {progress.get('batchId', 'N/A')}")
            print(f"    Input Rate: {progress.get('inputRowsPerSecond', 0):.1f} rows/sec")
    else:
        print(f"  • {query.name}: Stopped")

# Clean up queries
for query in [console_query, memory_query, file_query]:
    if query.isActive:
        query.stop()

print("\n🛑 Stopped all sink demo queries")

print("\n💡 Sink Selection Guidelines:")
sink_guidelines = {
    "Development/Debug": "Console sink",
    "Interactive Analysis": "Memory sink", 
    "Data Lake Storage": "File sink (Parquet/Delta)",
    "Real-time Apps": "Kafka sink",
    "Databases": "JDBC sink (via foreachBatch)",
    "Custom Logic": "Custom sink (via foreach/foreachBatch)"
}

for use_case, sink_type in sink_guidelines.items():
    print(f"  • {use_case}: {sink_type}")

## 8. Basic Aggregations and Windowing

Introduction to stateful operations and time-based aggregations.

In [0]:
business_logic_stream.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- order_timestamp: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- customer_location: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- customer_segment: string (nullable = false)
 |-- discount_eligible: boolean (nullable = true)



In [0]:
# Create aggregation queries
print("📊 Setting up aggregations...")

# Product performance aggregation
product_agg = business_logic_stream \
    .groupBy("product_name", "category") \
    .agg(
        count("*").alias("order_count"),
        sum("price").alias("total_revenue"),
        avg("price").alias("avg_price"),
        approx_count_distinct("customer_id").alias("unique_customers")
    ) \
    .withColumn("avg_price", round(col("avg_price"), 2)) \
    .withColumn("total_revenue", round(col("total_revenue"), 2))

# Location-based aggregation
location_agg = business_logic_stream \
    .groupBy("customer_location") \
    .agg(
        count("*").alias("total_orders"),
        sum(col("price") * col("quantity")).alias("total_sales"),
        approx_count_distinct("customer_id").alias("active_customers")
    ) \
    .withColumn("avg_order_value", 
        round(col("total_sales") / col("total_orders"), 2)
    ) \
    .withColumnRenamed("customer_location", "location")

print("✅ Aggregation queries defined")

# Start aggregation streams
print("\n🔄 Starting aggregation demos...")

📊 Setting up aggregations...
✅ Aggregation queries defined

🔄 Starting aggregation demos...


In [0]:
product_query = product_agg \
    .writeStream \
    .format("memory") \
    .queryName("product_performance") \
    .outputMode("update") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/product_performance") \
    .trigger(availableNow=True) \
    .start()

location_query = location_agg \
    .writeStream \
    .format("memory") \
    .queryName("location_performance") \
    .outputMode("update") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/location_performance") \
    .trigger(availableNow=True) \
    .start()

# Let them run
time.sleep(25)

# Show results
print("\n📈 Product Performance:")
spark.sql("""
    SELECT product_name, category, order_count, total_revenue, avg_price, unique_customers
    FROM product_performance 
    ORDER BY total_revenue DESC
""").show()

print("\n🗺️ Location Performance:")
spark.sql("""
    SELECT location, total_orders, total_sales, active_customers, avg_order_value
    FROM location_performance 
    ORDER BY total_sales DESC
""").show()

# Clean up
product_query.stop()
location_query.stop()
print("\n✅ Aggregation demos complete")


📈 Product Performance:
+-------------------+-----------+-----------+-------------+---------+----------------+
|       product_name|   category|order_count|total_revenue|avg_price|unique_customers|
+-------------------+-----------+-----------+-------------+---------+----------------+
|        MacBook Pro|Electronics|          2|       3991.3|  1995.65|               2|
|        MacBook Pro|Electronics|          1|      2058.21|  2058.21|               1|
|        Smart Watch|Electronics|          2|        862.6|    431.3|               2|
|          iPhone 15|Electronics|          1|       801.74|   801.74|               1|
|       Gaming Chair|  Furniture|          2|       572.45|   286.23|               2|
|        Smart Watch|Electronics|          1|       425.37|   425.37|               1|
|       Gaming Chair|  Furniture|          1|       270.43|   270.43|               1|
|Wireless Headphones|Electronics|          1|       221.48|   221.48|               1|
|       Nike Air Ma

# 🔥 Part III: Advanced Features

## 9. Window Operations

In [0]:
print("🪟 Window Operations Overview:")
print("  • Tumbling Windows: Fixed, non-overlapping intervals")
print("  • Sliding Windows: Fixed size, overlapping intervals")
print("  • Session Windows: Dynamic, activity-based intervals")

# Tumbling window: 2-minute windows
tumbling_sales = business_logic_stream \
    .withWatermark("order_timestamp", "2 minutes") \
    .groupBy(
        window(col("order_timestamp"), "2 minutes"),
        col("category")
    ) \
    .agg(
        count("*").alias("order_count"),
        sum("price").alias("revenue"),
        approx_count_distinct("customer_id").alias("unique_customers")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "category",
        "order_count",
        round(col("revenue"), 2).alias("revenue"),
        "unique_customers"
    )

print("\n✅ Tumbling window configured (2-minute intervals)")

# Sliding window: 5-minute windows every 2 minutes
sliding_trends = business_logic_stream \
    .withWatermark("order_timestamp", "3 minutes") \
    .groupBy(
        window(col("order_timestamp"), "5 minutes", "2 minutes"),
        col("customer_location")
    ) \
    .agg(
        count("*").alias("order_count"),
        avg("price").alias("avg_price")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("customer_location").alias("location"),
        "order_count",
        round(col("avg_price"), 2).alias("avg_price")
    )

print("✅ Sliding window configured (5-minute windows, 2-minute slides)")

# Start window operations
print("\n🔄 Starting window operation demos...")
tumbling_query = tumbling_sales \
    .writeStream \
    .format("memory") \
    .queryName("tumbling_sales") \
    .outputMode("append") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/tumbling_sales") \
    .trigger(availableNow=True) \
    .start()

sliding_query = sliding_trends \
    .writeStream \
    .format("memory") \
    .queryName("sliding_trends") \
    .outputMode("append") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/sliding_trends") \
    .trigger(availableNow=True) \
    .start()

# Let them run
time.sleep(35)

# Show results
print("\n📊 Tumbling Window Results (2-minute intervals):")
spark.sql("""
    SELECT window_start, window_end, category, order_count, revenue, unique_customers
    FROM tumbling_sales 
    ORDER BY window_start DESC, revenue DESC
    LIMIT 10
""").show(truncate=False)

print("\n📈 Sliding Window Results (5-min windows, 2-min slides):")
spark.sql("""
    SELECT window_start, location, order_count, avg_price
    FROM sliding_trends 
    ORDER BY window_start DESC, avg_price DESC
    LIMIT 10
""").show(truncate=False)

# Clean up
tumbling_query.stop()
sliding_query.stop()
print("\n✅ Window operation demos complete")

🪟 Window Operations Overview:
  • Tumbling Windows: Fixed, non-overlapping intervals
  • Sliding Windows: Fixed size, overlapping intervals
  • Session Windows: Dynamic, activity-based intervals

✅ Tumbling window configured (2-minute intervals)
✅ Sliding window configured (5-minute windows, 2-minute slides)

🔄 Starting window operation demos...

📊 Tumbling Window Results (2-minute intervals):
+-------------------+-------------------+-----------+-----------+-------+----------------+
|window_start       |window_end         |category   |order_count|revenue|unique_customers|
+-------------------+-------------------+-----------+-----------+-------+----------------+
|2025-09-14 14:42:00|2025-09-14 14:44:00|Electronics|1          |2058.21|1               |
|2025-09-14 14:38:00|2025-09-14 14:40:00|Furniture  |1          |270.43 |1               |
|2025-09-14 14:34:00|2025-09-14 14:36:00|Electronics|1          |437.23 |1               |
|2025-09-14 14:22:00|2025-09-14 14:24:00|Fashion    |1   

## 10. Watermarks and Late Data

**Understanding Watermarks:**
  • Define how late data can arrive before being dropped  
  • Enable automatic cleanup of old state  
  • Balance between data completeness and memory usage

In [0]:
# Conservative watermark (5 minutes)
conservative_agg = business_logic_stream \
    .withWatermark("order_timestamp", "5 minutes") \
    .groupBy(
        window(col("order_timestamp"), "3 minutes"),
        col("category")
    ) \
    .agg(
        count("*").alias("order_count"),
        sum("price").alias("revenue")
    ) \
    .select(
        col("window.start").alias("window_start"),
        "category",
        "order_count",
        round(col("revenue"), 2).alias("revenue")
    )

print("\n✅ Conservative watermark: 5 minutes (high completeness)")

# Aggressive watermark (30 seconds)
aggressive_agg = business_logic_stream \
    .withWatermark("order_timestamp", "30 seconds") \
    .groupBy(
        window(col("order_timestamp"), "1 minute"),
        col("customer_location")
    ) \
    .agg(
        count("*").alias("order_count"),
        avg("price").alias("avg_price")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("customer_location").alias("location"),
        "order_count",
        round(col("avg_price"), 2).alias("avg_price")
    )

print("✅ Aggressive watermark: 30 seconds (low latency)")

# Start watermark comparison
print("\n🔄 Starting watermark comparison...")

conservative_query = conservative_agg \
    .writeStream \
    .format("memory") \
    .queryName("conservative_watermark") \
    .outputMode("append") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/conservative_watermark") \
    .trigger(availableNow=True) \
    .start()

aggressive_query = aggressive_agg \
    .writeStream \
    .format("memory") \
    .queryName("aggressive_watermark") \
    .outputMode("append") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/aggressive_watermark") \
    .trigger(availableNow=True) \
    .start()

# Let them run
time.sleep(30)

# Show results
print("\n📊 Conservative Watermark Results (5-minute tolerance):")
spark.sql("""
    SELECT window_start, category, order_count, revenue
    FROM conservative_watermark 
    ORDER BY window_start DESC, revenue DESC
    LIMIT 8
""").show(truncate=False)

print("\n⚡ Aggressive Watermark Results (30-second tolerance):")
spark.sql("""
    SELECT window_start, location, order_count, avg_price
    FROM aggressive_watermark 
    ORDER BY window_start DESC, avg_price DESC
    LIMIT 8
""").show(truncate=False)

# Clean up
conservative_query.stop()
aggressive_query.stop()
print("\n✅ Watermark comparison complete")


✅ Conservative watermark: 5 minutes (high completeness)
✅ Aggressive watermark: 30 seconds (low latency)

🔄 Starting watermark comparison...

📊 Conservative Watermark Results (5-minute tolerance):
+-------------------+-----------+-----------+-------+
|window_start       |category   |order_count|revenue|
+-------------------+-----------+-----------+-------+
|2025-09-14 14:42:00|Electronics|1          |2058.21|
|2025-09-14 14:39:00|Electronics|1          |1079.94|
|2025-09-14 14:36:00|Electronics|1          |896.54 |
|2025-09-14 14:36:00|Furniture  |1          |270.43 |
|2025-09-14 14:33:00|Electronics|1          |437.23 |
|2025-09-14 14:21:00|Furniture  |1          |302.02 |
|2025-09-14 14:21:00|Fashion    |1          |104.64 |
|2025-09-14 14:18:00|Electronics|1          |801.74 |
+-------------------+-----------+-----------+-------+


⚡ Aggressive Watermark Results (30-second tolerance):
+-------------------+----------+-----------+---------+
|window_start       |location  |order_count

## 8. Streaming Joins

Streaming joins allow you to combine streaming data with static data or other streams for enrichment and complex analytics.

🔗 **Streaming Joins Overview:**
  • Stream-to-Static: Join streaming data with static reference tables  
  • Stream-to-Stream: Join two streaming datasets  
  • Inner/Outer Joins: Different join types for various use cases


In [0]:
print("🔗 Streaming Joins Overview:")
print("  • Stream-to-Static: Join streaming data with static reference tables")
print("  • Stream-to-Stream: Join two streaming datasets")
print("  • Inner/Outer Joins: Different join types for various use cases")

# Create static reference data (product catalog)
product_catalog_data = [
    ("iPhone 15", "Electronics", "Apple", 999.99, "Premium"),
    ("MacBook Pro", "Electronics", "Apple", 1999.99, "Premium"),
    ("Nike Shoes", "Fashion", "Nike", 129.99, "Standard"),
    ("Coffee Maker", "Home", "Breville", 89.99, "Standard"),
    ("Headphones", "Electronics", "Sony", 199.99, "Standard"),
    ("Gaming Chair", "Furniture", "Herman Miller", 299.99, "Premium")
]

product_catalog_schema = StructType([
    StructField("product_name", StringType(), True),
    StructField("catalog_category", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("list_price", DoubleType(), True),
    StructField("tier", StringType(), True)
])

product_catalog = spark.createDataFrame(product_catalog_data, product_catalog_schema)
product_catalog.createOrReplaceTempView("product_catalog")

print("\n📋 Product Catalog (Static Reference Data):")
display(product_catalog)

# Stream-to-Static Join: Enrich orders with product details
print("\n1️⃣ Stream-to-Static Join Example:")

enriched_orders = business_logic_stream \
    .withColumnRenamed("category", "order_category") \
    .join(product_catalog, "product_name", "left") \
    .withColumn("discount_amount", col("list_price") - col("price")) \
    .withColumn("discount_percentage", 
        round(((col("list_price") - col("price")) / col("list_price")) * 100, 2)
    ) \
    .withColumn("total_value", col("price") * col("quantity")) \
    .select(
        "customer_id", "order_id", "product_name", "brand", "tier",
        "price", "list_price", "discount_amount", "discount_percentage",
        "quantity", "total_value", "order_category", "catalog_category", "customer_location", "order_timestamp"
    )

print("✅ Stream-to-static join: Orders enriched with product catalog data")

# Create a second stream for customer behavior tracking
customer_behavior_stream = file_stream \
    .withColumn("customer_id", (col("value") % 100) + 1001) \
    .withColumn("event_type", 
        when(col("value") % 4 == 0, "page_view")
        .when(col("value") % 4 == 1, "add_to_cart")
        .when(col("value") % 4 == 2, "remove_from_cart")
        .otherwise("wishlist_add")
    ) \
    .withColumn("page_category", 
        when(col("value") % 5 == 0, "Electronics")
        .when(col("value") % 5 == 1, "Fashion")
        .when(col("value") % 5 == 2, "Home")
        .when(col("value") % 5 == 3, "Furniture")
        .otherwise("Sports")
    ) \
    .withColumn("session_id", (col("value") / 10).cast("int")) \
    .select("customer_id", "event_type", "page_category", "session_id", "timestamp") \
    .withColumnRenamed("timestamp", "event_timestamp")

print("\n2️⃣ Stream-to-Stream Join Example:")

# Stream-to-Stream Join: Correlate orders with customer behavior
customer_order_correlation = business_logic_stream \
    .withWatermark("order_timestamp", "10 minutes") \
    .alias("orders") \
    .join(
        customer_behavior_stream
            .withWatermark("event_timestamp", "10 minutes")
            .alias("behavior"),
        col("orders.customer_id") == col("behavior.customer_id"),
        "inner"
    ) \
    .where(
        col("orders.order_timestamp") >= col("behavior.event_timestamp") - expr("INTERVAL 5 MINUTES")
    ) \
    .where(
        col("orders.order_timestamp") <= col("behavior.event_timestamp") + expr("INTERVAL 2 MINUTES")
    ) \
    .select(
        col("orders.customer_id"),
        col("orders.product_name"),
        col("orders.price"),
        col("orders.order_category"),
        col("behavior.event_type"),
        col("behavior.page_category"),
        col("orders.order_timestamp").alias("order_time"),
        col("behavior.event_timestamp")
    )

print("✅ Stream-to-stream join: Orders correlated with customer behavior events")

# Start join demonstrations
print("\n🔄 Starting streaming joins demos...")

# Stream-to-static join query
enriched_query = enriched_orders \
    .writeStream \
    .format("memory") \
    .queryName("enriched_orders") \
    .outputMode("append") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/enriched_query") \
    .trigger(availableNow=True) \
    .start()

# Stream-to-stream join query
correlation_query = customer_order_correlation \
    .writeStream \
    .format("memory") \
    .queryName("customer_correlation") \
    .outputMode("append") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/correlation_query_") \
    .trigger(availableNow=True) \
    .start()

# Let them run
time.sleep(30)

# Show stream-to-static join results
print("\n📊 Stream-to-Static Join Results (Enriched Orders):")
display(spark.sql("""
    SELECT customer_id, product_name, brand, tier,
           price, list_price, discount_percentage, total_value, order_category, catalog_category
    FROM enriched_orders 
    WHERE discount_percentage > 0
    ORDER BY discount_percentage DESC
    LIMIT 10
"""))

# Show stream-to-stream join results
print("\n🔗 Stream-to-Stream Join Results (Customer Behavior Correlation):")
display(spark.sql("""
    SELECT customer_id, product_name, order_category, event_type, page_category,
           order_time, event_timestamp
    FROM customer_correlation 
    ORDER BY order_time DESC
    LIMIT 10
"""))

# Advanced join analytics
print("\n📈 Advanced Join Analytics:")

# Customer purchase patterns
display(spark.sql("""
    SELECT 
        event_type,
        order_category,
        COUNT(*) as correlation_count,
        ROUND(AVG(price), 2) as avg_order_value
    FROM customer_correlation 
    GROUP BY event_type, order_category
    ORDER BY correlation_count DESC
"""))

# Discount effectiveness analysis
display(spark.sql("""
    SELECT 
        tier,
        COUNT(*) as order_count,
        ROUND(AVG(discount_percentage), 2) as avg_discount,
        ROUND(AVG(total_value), 2) as avg_order_value
    FROM enriched_orders 
    WHERE discount_percentage > 0
    GROUP BY tier
    ORDER BY avg_order_value DESC
"""))

# Clean up
enriched_query.stop()
correlation_query.stop()

print("\n✅ Streaming joins demos complete")

print("\n💡 Streaming Joins Best Practices:")
print("  • Use watermarks for time-based joins to manage state")
print("  • Consider join cardinality and potential data skew")
print("  • Use appropriate join types (inner vs outer)")
print("  • Monitor memory usage for stateful stream-to-stream joins")
print("  • Cache static data for stream-to-static joins")
print("  • Set time bounds for stream-to-stream joins to control state size")

🔗 Streaming Joins Overview:
  • Stream-to-Static: Join streaming data with static reference tables
  • Stream-to-Stream: Join two streaming datasets
  • Inner/Outer Joins: Different join types for various use cases

📋 Product Catalog (Static Reference Data):


product_name,catalog_category,brand,list_price,tier
iPhone 15,Electronics,Apple,999.99,Premium
MacBook Pro,Electronics,Apple,1999.99,Premium
Nike Shoes,Fashion,Nike,129.99,Standard
Coffee Maker,Home,Breville,89.99,Standard
Headphones,Electronics,Sony,199.99,Standard
Gaming Chair,Furniture,Herman Miller,299.99,Premium



1️⃣ Stream-to-Static Join Example:
✅ Stream-to-static join: Orders enriched with product catalog data

2️⃣ Stream-to-Stream Join Example:
✅ Stream-to-stream join: Orders correlated with customer behavior events

🔄 Starting streaming joins demos...


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7269659990446090>, line 112[0m
[1;32m    102[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124m🔄 Starting streaming joins demos...[39m[38;5;124m"[39m)
[1;32m    104[0m [38;5;66;03m# Stream-to-static join query[39;00m
[1;32m    105[0m enriched_query [38;5;241m=[39m enriched_orders \
[1;32m    106[0m     [38;5;241m.[39mwriteStream \
[1;32m    107[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mmemory[39m[38;5;124m"[39m) \
[1;32m    108[0m     [38;5;241m.[39mqueryName([38;5;124m"[39m[38;5;124menriched_orders[39m[38;5;124m"[39m) \
[1;32m    109[0m     [38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m) \
[1;32m    110[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcheckpointLocati

# 🏭 Part IV: Production Patterns

## 10. Monitoring and Query Management

In [0]:
# Create a monitoring query
monitoring_stream = business_logic_stream \
    .groupBy("category") \
    .agg(
        count("*").alias("total_orders"),
        sum("price").alias("total_revenue")
    )

print("📊 Starting monitoring example...")

monitor_query = monitoring_stream \
    .writeStream \
    .format("memory") \
    .queryName("business_monitor") \
    .outputMode("update") \
    .option("checkpointLocation", f"{data_dir}/streaming_checkpoints/monitor_query") \
    .trigger(availableNow=True) \
    .start()

# Let it run
time.sleep(20)

# Monitor query health
print("\n🔍 Query Health Check:")
print(f"Query Name: {monitor_query.name}")
print(f"Query ID: {monitor_query.id}")
print(f"Is Active: {monitor_query.isActive}")

# Get latest progress
progress = monitor_query.lastProgress
if progress:
    print("\n📈 Latest Progress Metrics:")
    print(f"  Batch ID: {progress.get('batchId', 'N/A')}")
    print(f"  Input Rate: {progress.get('inputRowsPerSecond', 'N/A')} rows/sec")
    print(f"  Process Rate: {progress.get('processedRowsPerSecond', 'N/A')} rows/sec")
    print(f"  Batch Duration: {progress.get('batchDuration', 'N/A')} ms")
    
    if 'stateOperators' in progress:
        print(f"  State Operations: {len(progress['stateOperators'])}")

# Show current results
print("\n📊 Current Business Metrics:")
spark.sql("""
    SELECT category, total_orders, ROUND(total_revenue, 2) as total_revenue
    FROM business_monitor 
    ORDER BY total_revenue DESC
""").show()

# List all active streams
print("\n🔄 All Active Streaming Queries:")
for i, query in enumerate(spark.streams.active, 1):
    print(f"  {i}. {query.name} (ID: {query.id})")

monitor_query.stop()
print("\n✅ Monitoring example complete")

📊 Starting monitoring example...

🔍 Query Health Check:
Query Name: business_monitor
Query ID: 891c0222-31a5-4a53-bee0-acc8ae3e00ed
Is Active: True

📈 Latest Progress Metrics:
  Batch ID: 1
  Input Rate: 0.4010909674314135 rows/sec
  Process Rate: 0.7616146230007615 rows/sec
  Batch Duration: 6565 ms
  State Operations: 1

📊 Current Business Metrics:
+-----------+------------+-------------+
|   category|total_orders|total_revenue|
+-----------+------------+-------------+
|Electronics|           4|      4354.12|
|  Furniture|           2|       640.78|
|  Furniture|           1|       288.07|
|Electronics|           1|       208.24|
|       Home|           2|       163.85|
|     Sports|           2|        90.67|
|     Sports|           1|        46.31|
+-----------+------------+-------------+


🔄 All Active Streaming Queries:
  1. business_monitor (ID: 891c0222-31a5-4a53-bee0-acc8ae3e00ed)

✅ Monitoring example complete


🛡️ **Production Best Practices**
===================================

- **Checkpointing**: Always set checkpoint locations for fault tolerance
- **Watermarks**: Use appropriate watermarks for windowed operations
- **Resource Planning**: Plan cluster resources for peak loads
- **Monitoring**: Monitor query progress and performance metrics
- **Error Handling**: Implement proper exception handling
- **Schema Evolution**: Plan for schema changes in data sources
- **State Management**: Monitor state size and cleanup policies
- **Testing**: Test with realistic data volumes and patterns

In [0]:

# Example of robust query with error handling
print("\n🔧 Robust Query Example:")

try:
    robust_query = business_logic_stream \
        .filter(col("price") > 0) \
        .groupBy("location") \
        .agg(
            count("*").alias("order_count"),
            avg("price").alias("avg_price")
        ) \
        .writeStream \
        .format("memory") \
        .queryName("robust_example") \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/robust_checkpoint") \
        .trigger(processingTime="10 seconds") \
        .start()
    
    print("✅ Robust query started with checkpointing")
    time.sleep(15)
    
    # Check results
    print("\n📊 Robust Query Results:")
    spark.sql("""
        SELECT location, order_count, ROUND(avg_price, 2) as avg_price
        FROM robust_example 
        ORDER BY avg_price DESC
    """).show()
    
    robust_query.stop()
    print("✅ Robust query stopped gracefully")
    
except Exception as e:
    print(f"❌ Error occurred: {str(e)}")
    print("💡 This is why error handling is important in production!")

print("\n🎯 Key Production Recommendations:")
recommendations = [
    "Set appropriate trigger intervals (not too frequent)",
    "Use UPDATE mode for aggregations, APPEND for raw data",
    "Monitor memory usage and state growth",
    "Plan for data source failures and recovery",
    "Test thoroughly with production-like data volumes",
    "Document your streaming pipeline architecture"
]

for i, rec in enumerate(recommendations, 1):
    print(f"  {i}. {rec}")

print("\n✅ Production best practices covered")

## 12. Kafka Integration Example

In [0]:
print("🔗 Kafka Integration Pattern")
print("=" * 30)

# Sample Kafka configuration
kafka_config = {
    "kafka.bootstrap.servers": "localhost:9092",
    "subscribe": "orders_topic",
    "startingOffsets": "earliest",
    "kafka.session.timeout.ms": "30000",
    "kafka.request.timeout.ms": "40000"
}

print("⚙️ Kafka Configuration:")
for key, value in kafka_config.items():
    print(f"  {key}: {value}")

# Sample order schema for Kafka messages
kafka_order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("order_time", TimestampType(), True),
    StructField("location", StringType(), True)
])

print("\n📋 Kafka Message Schema:")
for field in kafka_order_schema.fields:
    print(f"  • {field.name}: {field.dataType}")

# Sample Kafka streaming code (requires actual Kafka setup)
kafka_code_template = '''
# Reading from Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders_topic") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse JSON messages
parsed_orders = kafka_stream \
    .select(
        from_json(col("value").cast("string"), kafka_order_schema).alias("data"),
        col("timestamp").alias("kafka_timestamp")
    ) \
    .select("data.*", "kafka_timestamp")

# Process and aggregate
kafka_analytics = parsed_orders \
    .withWatermark("order_time", "5 minutes") \
    .groupBy(
        window(col("order_time"), "10 minutes"),
        col("location")
    ) \
    .agg(
        count("*").alias("order_count"),
        sum("price").alias("total_revenue")
    )

# Write results back to Kafka or other sink
kafka_output = kafka_analytics \
    .selectExpr("CAST(location AS STRING) AS key", 
                "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "analytics_results") \
    .option("checkpointLocation", "/tmp/kafka_checkpoint") \
    .start()
'''

print("\n💻 Kafka Integration Code Template:")
print(kafka_code_template)

print("\n🔗 Alternative Data Sources for Testing:")
alternatives = [
    "File streaming: Monitor directories for new files",
    "Socket streaming: TCP socket for real-time data",
    "Rate source: Built-in data generator for testing",
    "Memory source: In-memory data for development"
]

for i, alt in enumerate(alternatives, 1):
    print(f"  {i}. {alt}")

print("\n✅ Kafka integration pattern explained")

🎉 **Congratulations! Tutorial Complete!**
========================================

**Skills Acquired:**
- ✅ Environment setup and Spark configuration
- ✅ Data sources and schema definition
- ✅ ReadStream and WriteStream operations
- ✅ Basic transformations and filtering
- ✅ Output modes (Append, Update, Complete)
- ✅ Trigger configurations and timing
- ✅ Aggregations and grouping operations
- ✅ Window operations (Tumbling, Sliding)
- ✅ Watermarks and late data handling
- ✅ Query monitoring and management
- ✅ Production best practices
- ✅ Kafka integration patterns

**Next Steps:**
1. Set up a real Kafka cluster for production testing
2. Implement comprehensive monitoring dashboards
3. Explore Delta Lake for ACID streaming transactions
4. Build end-to-end streaming applications
5. Optimize performance for your specific use cases
6. Learn advanced topics like streaming ML pipelines

**Key Takeaways:**
• Start with simple transformations, add complexity gradually  
• Choose appropriate output modes for your use case  
• Set realistic watermarks based on business requirements  
• Always plan for failure scenarios and recovery  
• Monitor performance metrics continuously  
• Test with production-like data volumes  

🧹 **Cleaning up active streams...**

🌟 Thank you for completing the Spark Structured Streaming Tutorial! 🌟

Happy Streaming! 🚀