<p><img src="https://upload.wikimedia.org/wikipedia/commons/thumb/1/1e/UNAL_Logosimbolo.svg/583px-UNAL_Logosimbolo.svg.png" alt="" width="1280" height="300" /></p>

# READ STREAMING DATA

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. 

nternally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.

## SPARK STREAMING TYPES ENGINE

### DISCRETIZED STREAMS (DISCONTINUED)

DStreams are the original streaming model in Spark, introduced in Spark Streaming (before Spark 2.0).

It works by breaking the incoming stream into small batches (micro-batches). Each batch is processed as an RDD (Resilient Distributed Dataset).

DStreams use RDD-based APIs, which are lower-level and more manual.

It's called Discretized because it converts a continuous stream into discrete chunks (batches) every few seconds.

In simple words:
DStreams = Old Spark Streaming, works with batches of RDDs behind the scenes.

### STRUCTURED STREAMING (CURRENT)

Structured Streaming is the modern and recommended streaming engine in Spark (introduced in Spark 2.0).

It works using DataFrames and SQL APIs.

Even though it also uses micro-batches internally, it presents the stream as a continuous table that keeps getting updated as new data arrives.

It supports event-time processing, windowing, joins, and SQL operations on streaming data — much more powerful and easier to write.

In simple words:
Structured Streaming = Streaming with DataFrames and SQL, modern, powerful, and simpler to use.


**KEY CONCEPT: UNBOUNDED TABLE**
![unbounded table](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)


**BETTER UNDERSTANDING**
![unbounded table](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)




## SPARK.READ

`spark.read` is the entry point in Apache Spark to load batch data (static data that doesn’t change while you're processing it).

**Sintax** 


```python
df = spark.read.format("format_name") \
    .option("key", "value") \
    .load("path or source")


df = df.transformations(...)
```


* `.format() `→ defines the data source (e.g., csv, parquet, json, jdbc, etc.)
* `.option()` → sets specific options (like header, delimiter, etc.)
* `.load()` → loads the data from file path or source

### FORMATS

The `.format()` method in Spark tells Spark which data source to connect to when reading data in streaming mode. It’s like saying: "Where are the incoming data coming from that I want to process in streaming?"

```python
df = spark.readStream.format("kafka").load()
```

support mucho others, but these are the common integrations


| **Format**   | **Description**                                               |
|--------------|---------------------------------------------------------------|
| `kafka`    | Read streaming data from an Apache Kafka topic.               |
| `rate`     | Simulated data stream (generates rows per second).            |
| `socket`   | Read streaming text data from a TCP socket (useful for demos).|
| `delta`    | Read streaming data from a Delta Lake table.                  |
| `parquet`  | Read new Parquet files arriving in a folder.                  |
| `csv`      | Read new CSV files arriving in a folder in streaming mode.    |
| `json`     | Read new JSON files arriving in a folder in streaming mode.   |
| `pubsub`   | Read data from Google Cloud Pub/Sub.                          |
| `kinesis`  | Read data from Amazon Kinesis.                                |
| `eventhubs`| Read data from Azure Event Hubs.                              |



### READING SAMPLES

#### KAFKA

In [0]:
df_kafka = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "my_topic") \
    .load()

#### CSV


In [0]:
df_csv = spark.read.format("csv") \
    .option("header", "true") \
    .load("/path/to/file.csv")

#### FOLDER

In [0]:
df_folder = spark.read.format("csv") \
    .option("header", "true") \
    .load("/path/to/my_folder/")

#### SYNTHETIC DATA


RateStreamSource is a streaming source that generates consecutive numbers with timestamp that can be useful for testing and PoCs.

[for more details, click here](https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-RateStreamSource.html)

| **Name**         | **Default Value**         | **Description**                              |
|------------------|---------------------------|----------------------------------------------|
| `numPartitions`  | (default parallelism)     | Number of partitions to use                  |
| `rampUpTime`     | 0 (seconds)               | Time to ramp up before reaching max speed    |
| `rowsPerSecond`  | 1                         | Number of rows to generate per second (must be > 0) |




##### DEFAULT

In [0]:
type(stream_df)

In [0]:
dfs_default = spark.readStream.format("rate").option("rowsPerSecond", 5).load()
display(dfs_default)

In [0]:
from pyspark.sql.functions import count, col, window

In [0]:


dfs_default.groupBy(window(col("timestamp"), "5 seconds")).agg(count("*").alias("rows_per_5_seconds")).orderBy("window").display()

##### CUSTOM DATA

###### DATAFRAME

In [0]:
df_simulated = spark.readStream.format("rate").option("rowsPerSecond", 2).load()

In [0]:
df_simulated = df_simulated.withColumn("date_generated", current_timestamp())\
    .withColumn("product_id", (rand() * 10).cast("int")) \
    .withColumn("quantity", (rand() * 5 + 1).cast("int")) \
    .withColumn("price_usd", round(rand() * 100 + 10, 2)) \
    .withColumn("total_usd", col("quantity") * col("price_usd"))


In [0]:
display(df_simulated)

In [0]:
df_simulated.groupBy(window(col("timestamp"), "20 seconds")).agg(count("*").alias("rows_per_10_seconds")).orderBy("window").display()

###### FROM JSON

In [0]:
dfs_json = spark.readStream.format("rate").option("rowsPerSecond", 2).load()
df_simulated = dfs_json.withColumn("date_generated", current_timestamp())\
    .withColumn("product_id", (rand() * 10).cast("int")) \
    .withColumn("quantity", (rand() * 5 + 1).cast("int")) \
    .withColumn("price_usd", round(rand() * 100 + 10, 2)) \
    .withColumn("total_usd", col("quantity") * col("price_usd"))

json_df = df_simulated.withColumn(
    "json_value",
    to_json(
        struct(
            col("date_generated"),
            col("product_id"),
            col("quantity"),
            col("price_usd"),
            col("total_usd")
        )
    )
).select("date_generated", "json_value")

display(json_df)

###### FROM FILES

In [0]:

# 
base_path: str = "/mnt/data/streaming_lab2"

dbutils.fs.rm(base_path, recurse=True)
dbutils.fs.mkdirs(base_path)
print(f"folder: {base_path}")

In [0]:

import json
import random as rd
import time
import uuid
from datetime import datetime

def generate_events(rows):
    events = []
    for _ in range(rows):
        event = {
            "event_time": datetime.now().isoformat(),
            "user_id": rd.randint(1, 5),
            "product_id": rd.randint(100, 105),
            "quantity": rd.randint(1, 3),
            "price": rd.uniform(10.0, 100.0)
        }
        events.append(event)
    return events


for _ in range(3):
    events = generate_events(10)
    file_path = f"{base_path}/events_{uuid.uuid4()}.json"
    
    with open(f"/dbfs{file_path}", "w") as f:
        for event in events:
            f.write(json.dumps(event) + "\n")
    
    print(f"file -> {file_path} written.")
    time.sleep(5) 

In [0]:
stream_df = (
    spark.readStream
    .schema("event_time timestamp, user_id int, product_id int, quantity int, price double")
    .json(base_path)
)

display(stream_df)  # Visualiza en tiempo real

In [0]:
schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True)
])
# schema(schema)
# Read stream with schema
stream_df = (
    spark.readStream.format("json")
    .schema("event_time timestamp, user_id int, product_id int, quantity int, price double")
    .load(base_path)
)
stream_df_with_file = stream_df.withColumn("source_file", input_file_name())

display(stream_df_with_file)  # Visualiza en tiempo real

In [0]:
from pyspark.sql.functions import count, col, window, current_timestamp, rand, round, to_json, struct, input_file_name
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, DoubleType

from pyspark.sql.functions import window, sum as _sum


### WATERMARK

Generally speaking, when working with real-time streaming data there will be delays between event time and processing time due to how data is ingested and whether the overall application experiences issues like downtime. 

In Spark, the concept of watermark is used in Structured Streaming to handle late data, especially when events do not arrive in the expected order or with delays. The watermark helps Spark manage the delay of data and allows for real-time processing without waiting indefinitely.

Supported Interval Types:

* Seconds (seconds or s)
* Minutes (minutes or m)
* Hours (hours or h)
* Days (days or d)
* Milliseconds (milliseconds or ms)

In another words: allows to have a time tolerance for late messages of another batch

TIP

Watermark SOLO funciona si usas ventanas de tiempo.
El watermark sirve para manejar datos retrasados.

Pero Spark necesita algo que cierre ventanas para decidir cuándo descartar los datos atrasados.

Esas "cosas" que se cierran son las ventanas de tiempo (usadas en groupBy(window(...))).







In [0]:
stream_df = spark.readStream.format("rate").option("rowsPerSecond", 2).load()

water_df = (
  stream_df
  .withWatermark("timestamp", "30 seconds")  # tolera datos con 30 seg de retraso
  .groupBy(
    window(col("timestamp"), "10 seconds"),
  )
  .agg(
      _sum(col("value") ).alias("total_value"),
  )
  .orderBy("window")
)

display(water_df)


In [0]:
water_df2 = (
  stream_df
  .withWatermark("timestamp", "30 seconds")
)
display(water_df2)