# Structured Streaming

Five steps to define a streaming query:
  * Define Input sources
  * Transform data
  * Define output sink and output mode
  * Specify processing details
  * Start the query

#### Step 1 - Define Input sources

* use `spark.readStream` to create DataStreamReader
* An example of creating a DF from a text data stream to be received from a socket connection
* Spark natively supports reading data streams from Apache Kafka and all the various file-based formats  

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

23/05/07 14:10:33 WARN Utils: Your hostname, thulasiram resolves to a loopback address: 127.0.1.1; using 192.168.0.105 instead (on interface wlp0s20f3)
23/05/07 14:10:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/07 14:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
lines = (spark
         .readStream.format("socket")
         .option("host", "localhost")
         .option("port",9999)
         .load()
    )

23/05/07 14:13:00 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


### Step 2: Transform data

* we can perform the usual DF operations

In [3]:
# Split the lines into individual words and count
from pyspark.sql.functions import *
words = lines.select(explode(split(col("value"),"\\s")).alias("word"))
counts = words.groupBy("word").count()

* `counts` is a streaming dataframe
* word counts that will be computed once the streaming query is started
* Two broad classes of data transformations:  
  * `stateless transformations` - Does not require historical rows. Operations like `select()`, `filter()`, `map()`
  * `stateful transformations` - require historical information. Operations like `grouping`, `joining` and `aggregations`

### Step 3: Define output sink and output mode

* write the processed data with `DataFrame.writeStream`
* Three output modes supported:  
  * `append` - New rows appended since the last trigger will be written (Default mode). This is suitable for cases where output is never going to be changed or updated by the query in the future. This is only supported by the stateless queries
  * `complete` - Completely overwrite an existing output stream. Suitable where results is much smaller than the input data and can be retained in memory
  * `update` - Rows updated since the last trigger will be changed.

In [6]:
# Writing the output stream to the console
writer = counts.writeStream.format("console").outputMode("complete")

### Step 4: Specify Processing details

In [8]:
checkpointDir = "....."
writer2 = (
    writer
    .trigger(processingTime="1 second")
    .option("checkpointLocation",checkpointDir)
)

Triggering Options:  
* `Default` - Next micro-batch is triggered as soon as the prevous micro-batch has completed
* `Processing time with trigger interval` - query will trigger micro-batches at specified fixed interval
* `Once` - Execute exactly one micro-batch. Useful when we want to control the triggering and processing from an external scheduler
* `continuous` - Process data continuously. Low latency than the micro-batch trigger modes
  
  
Checkpoint Location - Location where the streaming query saves the progress information. Upon failure, this metadata is used to start from where it left off.

### Step 5: Start the query

In [None]:
streamingQuery = writer2.start()

### Putting it together

In [None]:
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()
lines = (spark
         .readStream.format("socket")
         .option("host", "localhost")
         .option("port",9999)
         .load()
    )
words = lines.select(explode(split(col("value"), "\\s")).alias("word"))
counts = words.groupBy("word").count()
checkpointDir = `....`
streamingQuery = (
    counts
    .writeStream
    .format("console")
    .outputMode("complete")
    .trigger(processingTime="1 second")
    .option("checkpointLocation",checkpointDir)
    .start()
)
streamingQuery.awaitTermination()

### Working of an Active Streaming Query