## Structured Streaming

Structured Streaming is a powerful engine built on top of Apache Spark for processing real-time data in a scalable manner.

**Key Features**
- Exactly-once Processing: Ensures data is processed once even in failures.
- Fault-tolerant: Uses Spark's checkpointing and WAL (Write-Ahead Logging).


Configuring Service Principle

In [0]:
client_id = "XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
tenant_id = "XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
client_secret = "XXXX-XXXX-XXXX-XXXX-XXXX-XXXX"
storage_account_name = "dldatalakestorage"

In [0]:
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
spark.conf.set("fs.azure.account.key.dllakestorage.dfs.core.windows.net", "your_storage_account_key")


source_file_path = "abfss://streamingsource@"+storage_account_name+".dfs.core.windows.net"
destination_file_path = "abfss://streamingsink@"+storage_account_name+".dfs.core.windows.net"

### Reading Stream

In [0]:
schema = 'name string, city string'

df = spark.readStream.format("csv")\
     .schema(schema)\
     .load(source_file_path)

df.printSchema()

root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



In [0]:
df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5972144860636727>, line 1[0m
[0;32m----> 1[0m df[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     48[0m     logger[38;5;241m.[39mlog_success(
[1;32m     49[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m     50[0m     )
[1;32m     51[0m     [38;5;28;01mreturn[39;00m res

File [0;32m/databricks/spark/python/pyspark/sql/dataframe.py:10

### Writing Stream

#### Output Modes
- **append** : Appends new data as an entry
- **complete** : Just keeps last retrived data or final outcome
- **update** : Updates the data as they arrives, not keeps previous version

If want to read once, then you can use .trigger(once=True)

In [0]:
df.writeStream\
    .format("delta")\
    .outputMode("append")\
    .trigger(processingTime="10 seconds")\
    .option("checkpointLocation", "/tmp/checkpoint")\
    .start(destination_file_path)

<pyspark.sql.streaming.query.StreamingQuery at 0x7f2fe2908620>

In [0]:
dbutils.fs.ls('/tmp/checkpoint')

[FileInfo(path='dbfs:/tmp/checkpoint/__tmp_path_dir/', name='__tmp_path_dir/', size=0, modificationTime=1740031766000),
 FileInfo(path='dbfs:/tmp/checkpoint/commits/', name='commits/', size=0, modificationTime=1740031766000),
 FileInfo(path='dbfs:/tmp/checkpoint/metadata', name='metadata', size=45, modificationTime=1740031766000),
 FileInfo(path='dbfs:/tmp/checkpoint/offsets/', name='offsets/', size=0, modificationTime=1740031766000),
 FileInfo(path='dbfs:/tmp/checkpoint/sources/', name='sources/', size=0, modificationTime=1740031766000)]

# Window and Watermarking

In [0]:
from pyspark.sql.functions import window

data = [
    ("Alice", "New York", "2023-10-27 10:01:15"),
    ("Bob", "London", "2023-10-27 10:01:30"),
    ("Alice", "New York", "2023-10-27 10:02:45"),
    ("Charlie", "Tokyo", "2023-10-27 10:06:00"),  # Outside the first window
    ("David", "London", "2023-10-27 10:07:15"),
    ("Eve", "New York", "2023-10-27 10:08:30"),
    ("Bob", "London", "2023-10-27 10:11:00"),   # Into the next window.
    ("Alice", "New York", "2023-10-27 10:04:00") # Arrives late, but within watermark
]

schema = "name STRING, city STRING, event_time STRING"
streaming_df = spark.createDataFrame(data, schema)


### Watermarking
- Used to handle late-arriving data in streaming.
- Defines a cutoff time beyond which late events are ignored.
- Example: withWatermark("event_time", "10 minutes") → Drops events older than 10 mins.

### Windowing
- Groups data into time-based intervals for aggregation.
- Types:
  - Tumbling Window → Fixed, non-overlapping (e.g., every 5 mins).
  - Sliding Window → Overlapping, moves at fixed steps (e.g., 5-min window, 2-min slide).
  - Session Window → Based on user activity, resets after inactivity (Has dynamic timing based on activity).

In [0]:
# Tumbling Window

windowed_df = streaming_df \
    .withWatermark("event_time", "2 minutes")\
    .groupBy(
        window("event_time", "5 minutes")
    ) \
    .count()

In [0]:
windowed_df.show(truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2023-10-27 10:00:00, 2023-10-27 10:05:00}|4    |
|{2023-10-27 10:05:00, 2023-10-27 10:10:00}|3    |
|{2023-10-27 10:10:00, 2023-10-27 10:15:00}|1    |
+------------------------------------------+-----+



In [0]:
# Sliding Window
sliding_windowed_df = streaming_df \
    .withWatermark("event_time", "2 minutes")\
    .groupBy(
        window("event_time", "5 minutes","2 minutes")
    )\
    .count()

sliding_windowed_df.show(truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2023-10-27 10:00:00, 2023-10-27 10:05:00}|4    |
|{2023-10-27 09:58:00, 2023-10-27 10:03:00}|3    |
|{2023-10-27 10:02:00, 2023-10-27 10:07:00}|3    |
|{2023-10-27 10:06:00, 2023-10-27 10:11:00}|3    |
|{2023-10-27 10:04:00, 2023-10-27 10:09:00}|4    |
|{2023-10-27 10:08:00, 2023-10-27 10:13:00}|2    |
|{2023-10-27 10:10:00, 2023-10-27 10:15:00}|1    |
+------------------------------------------+-----+



In [0]:
# Session Window

from pyspark.sql.functions import session_window

windowed_df = streaming_df \
    .withWatermark("event_time", "2 minutes")\
    .groupBy(
        session_window("event_time", "1 minutes")
    ) \
    .count()

windowed_df.show(truncate=False)

+------------------------------------------+-----+
|session_window                            |count|
+------------------------------------------+-----+
|{2023-10-27 10:01:15, 2023-10-27 10:02:30}|2    |
|{2023-10-27 10:02:45, 2023-10-27 10:03:45}|1    |
|{2023-10-27 10:04:00, 2023-10-27 10:05:00}|1    |
|{2023-10-27 10:06:00, 2023-10-27 10:07:00}|1    |
|{2023-10-27 10:07:15, 2023-10-27 10:08:15}|1    |
|{2023-10-27 10:08:30, 2023-10-27 10:09:30}|1    |
|{2023-10-27 10:11:00, 2023-10-27 10:12:00}|1    |
+------------------------------------------+-----+

