In [None]:
# Install PySpark (for Colab). In a real Spark environment you do not need this line.
!pip install -q pyspark

from pyspark.sql import SparkSession
import time

# Create SparkSession
spark = (
    SparkSession.builder
        .appName("StructuredStreamingDemo")
        .master("local[*]")
        .getOrCreate()
)

# Reduce log noise
spark.sparkContext.setLogLevel("WARN")

# Streaming source that generates rows over time
df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 5)
         .load()
)

df = df.select("timestamp", "value")

def process_batch(batch_df, batch_id):
    print(f"\n=== Batch {batch_id} ===")
    batch_df.show(5, truncate=False)   # show first 5 rows

# Start query with a fixed trigger interval
query = (
    df.writeStream
      .outputMode("append")
      .foreachBatch(process_batch)
      .trigger(processingTime="2 seconds")
      .start()
)

# Let it run long enough to collect a few batches
time.sleep(8)
query.stop()



=== Batch 0 ===
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+


=== Batch 1 ===
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2025-12-06 15:06:52.753|0    |
|2025-12-06 15:06:53.153|2    |
|2025-12-06 15:06:53.553|4    |
|2025-12-06 15:06:52.953|1    |
|2025-12-06 15:06:53.353|3    |
+-----------------------+-----+


=== Batch 2 ===
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2025-12-06 15:06:53.753|5    |
|2025-12-06 15:06:54.153|7    |
|2025-12-06 15:06:54.553|9    |
|2025-12-06 15:06:54.953|11   |
|2025-12-06 15:06:55.353|13   |
+-----------------------+-----+
only showing top 5 rows


=== Batch 3 ===
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2025-12-06 15:06:56.753|20   |
|2025-12-06 15:06:57.153|22   |
|2025-12-06 15:06:57.553|24   |
|2025-12-06 15:06:56.953|21   |
|2025-12-06 15:06:57.353|23   |
+