# Core Concepts

## Transformations and Actions
Transformations and actions are the same, with a few restrictions. The restrictions usually involve some type of queries that the engine cannot incrementalize yet


## Input Sources
Structured streaming supports several input sources for reading in a streaming fashion. As at the moment, the supported input sources are:
* Apache kafka
* Files on a distributed file system like HDFS or S3
* A socket source for testing

## Sinks
Just as sources allow you to get data into Structured Streaming, sinks specify the destination for the result set of that stream. Sinks and the execution engine are also responsible for reliably tracking the exact progress of data processing. Here are the supported sinks
* Apache kafka
* Almost any file format
* A foreach sink for running arbitrary computation o0n the output records
* A console sink for testing
* A memory sink for debugging


## Output Modes
Defining a sink for our structured streaming job is only half the story. We alkso need to define how we want spark top write data to that sink. For instance, do we want to append new information? Do we want to update rows as we receive more information about them over time? Do we want to completely overwrite the result set everytime? To do this, we define output modes in the static structured APIs. The supported modes are as follows:
* Append(only add new records to the output sink)
* Update ( update changed records in place)
* Complete (rewrite the full output)

Certain queries, and certain sinks, only support certain output modes. For example, suppose that your job is just performing a map on a stream. The output data will grow indefinitely as new records arrive, so it would not make sense to use complete mode, which requires writing all the data to a new file at once. In contrast, if you are doing an aggregation into a limited number of keys, complete and update modes would make sense, but append would not, because the values of some keys need to be updated over time


## Triggers
Wheareas output modes define how data is output, triggers define when data is output, that is, when structured streaming should check for new input data and update its result. By default, it usually looks for new data as soon as it has finished processing the last group of input data, giving the lowest latency possible for new results. However, this behavior can lead to writing many small output files when the sink is a set of files. Thus, spark also supports triggers based on processing time

## Event-Time Processing
Structured Streaming also has support for event-time processing (i.e processing data based on timestamps included in the record that may arrive but out of order).

### event-time data
This means time fields are embedded in your data. This means that rather than processing data according to the time it rteaches your system, you process it according to the time that it wa sgenerated, even if the records arrive out of order at the streaming application due to slow uploads or network delays. Because the system views the input data as a table, the event time is just another field in that table, and your application can do grouping, aggregation, and windowing using standard SQL operators. However, under the hood, structured streaming can take some special actions when it knows that one of your columns is an event-time field, including query optimization or detrming when it is safe to forget state about a time window. Many of these actions can be controlled using watermarks

### Watermarks
They are a feature of streaming systems that allow you to specify how late they expect to se data in event time. For example, in an application that processes logs from mobile devices, one might expect logs to be upto 30 minutes late due to upload delays. Systems that support event time, including structured streaming, usually allow setting watermarks to limit how long they need to remember old data

# structured Streaming in Action
We will work with the Heterogeneity Human Activity Recognition Dataset. The dataset consists of smartphone and smartwatch sensor readings from a variety of devices, sampled at the highest frequency supported by the devices. Readings from these sensors were recorded while users performed activities like biking, sitting, standing, walking and so on. There are several different smartphones and smartwatchges used, and nine total users

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [24]:
static = spark.read.json('/home/kevin/Desktop/Big-Data-with-Pyspark/data/activity-data/')
dataSchema = static.schema
dataSchema

                                                                                

StructType([StructField('Arrival_Time', LongType(), True), StructField('Creation_Time', LongType(), True), StructField('Device', StringType(), True), StructField('Index', LongType(), True), StructField('Model', StringType(), True), StructField('User', StringType(), True), StructField('gt', StringType(), True), StructField('x', DoubleType(), True), StructField('y', DoubleType(), True), StructField('z', DoubleType(), True)])

In [25]:
static.show(5)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654419| 0.010025024|
|1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand|-3.814697E-4|   0.0184021|-0.013656616|
|1424686735890|1424688581945252808|nexus4_2|  185|nexus4|   g|stand|-3.814697E-4|-0.031799316| -0.00831604|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
only showing top 5 rows



In [26]:
streaming = spark.readStream.schema(dataSchema)\
                            .option('maxFilesPerTrigger', 1)\
                            .json('/home/kevin/Desktop/Big-Data-with-Pyspark/data/activity-data')

maxFilesPerTrigger allows you to control how quickly spark will read all of the files in tyhe folder. By specifying this value lower, we're artificially limiting the flow of the stream to one file per trigger. This helps us to demonstrate how structured streaming runs incrementally in our example.

In [27]:
activityCounts = streaming.groupBy('gt').count()


# set partitions
spark.conf.set('spark.sql.shuffle.partitions', 5)

Now that we have set up our transformation, we need only to specify the action tp start the query. We will specify the output destination, or output sink for our result of this query. For this example, we will write to a memory sink which keeps an in-memory table of the results. In the process of specifying this sink, we're going to need to define how spark will output that data. In this example, we use the complete output mode. This mode reqrites all of the keys along with their counts after every trigger

In [28]:
activityQuery = activityCounts.writeStream\
                                .queryName('activity_counts')\
                                .format('memory')\
                                .outputMode('complete')\
                                .start()


# activityQuery.awaitTermination()

22/11/08 23:43:45 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fa8858a5-a042-4f58-90bd-c9e8b3ac2a7e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/08 23:43:45 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


We must specify that we would like to wait for the termination of the query using awaitTermination() to prevent the driver process from exiting while the query is active. This must be included for production applications, otherwise, your stream won't be able to run. Spark lists this stream, and other active ones in 
```
spark.streams.active
```

Now that we have the stream running, we can experimenbt with the result by querying the in-memory table it is maintaining of the current output of our streaming aggregation. To see the current data in this output table, we simply need to query it. We will simply loop and print the results of the streaming query every second:

In [29]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|       sit|24619|
|     stand|22769|
|stairsdown|18729|
|      walk|26512|
|  stairsup|20905|
|      null|20896|
|      bike|21593|
+----------+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|       sit|61547|
|     stand|56924|
|stairsdown|46825|
|      walk|66280|
|  stairsup|52260|
|      null|52239|
|      bike|53984|
+----------+-----+

+----------+------+
|        gt| count|
+----------+------+
|       sit| 98471|
|     stand| 91079|
|stairsdown| 74922|
|      walk|106048|
|  stairsup| 83614|
|      null| 83584|
|      bike| 86377|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|135392|
|     stand|125234|
|stairsdown|103010|
|      walk|145816|
|  stairsup|114975|
|      null|114931|
|      bike|118773|
+----------+------+



## Tranformations on Streams


### Selections and Filterings


In [30]:
from pyspark.sql.functions import expr

simpleTransform = streaming.withColumn('stairs', expr("gt like '%stairs%'"))\
                            .where('stairs')\
                            .where('gt is not null')\
                            .select('gt', 'model', 'arrival_time', 'creation_time')\
                            .writeStream\
                            .queryName('simple_transform')\
                            .format('memory')\
                            .outputMode('append')\
                            .start()

22/11/09 00:02:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-57edf201-8ade-4fc6-b831-9a72090a4176. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/09 00:02:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

## Aggregations

In [32]:
deviceModelStats = streaming.cube('gt', 'model').avg()\
                            .drop("avg(Arrival_time)")\
                            .drop("avg(Creation_time)")\
                            .drop("avg(Index)")\
                            .writeStream\
                            .queryName('device_counts')\
                            .format('memory')\
                            .outputMode('complete')\
                            .start()

22/11/09 00:06:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-5ebaec1c-bc17-4570-aee8-c57fe0c350c3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/11/09 00:06:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [34]:
for x in range(2):
    spark.sql("SELECT * FROM device_counts").show()
    sleep(1)

+----------+------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+
|       sit|  null| -5.4943324403959E-4|2.791446281700071E-4|-2.33994461689892...|
|      walk|nexus4|-0.00390116006094...|0.001052508689953...|-6.95435553042998...|
|      walk|  null|-0.00390116006094...|0.001052508689953...|-6.95435553042998...|
|  stairsup|  null|-0.02479965287771643|-0.00800392344379...|-0.10034088415060415|
|     stand|  null|-3.11082189691727...|3.218461665975321...|2.141300040636463...|
|      bike|  null|0.022688759550866838|-0.00877912156368...|-0.08251001663412372|
|  stairsup|nexus4|-0.02479965287771643|-0.00800392344379...|-0.10034088415060415|
|      null|nexus4|4.796918779024287E-4|-0.00601540958963...|-0.01013356489164...|
|      null|  null|4.796918779024287E-4|-0.00601540958963...|-0.01013356489164...|
|sta

# Input and Output
This section dives deeper into the details of how sources, sinks and output modes work in structured streaming. Specifically, we discuss how, and when, and where data flows into and out of the system. 


## Where Data is Read and Written (Sources and Sinks)
Structured Streaming suppoirts several production sources and sinks (files and apache kafka), as well as some debugging tools like the memory sink table. 

### File Source and sink
Probably the simplest source is tyhe simplest file source. It's easy to reason about and understand. These are csv, parquet, json etc. The only difference between the file source/sink and spark's file source is that with streaming, we can control the number of files that we read in during each trigger via the maxFilesPerTrigger option. Any files you add into an input directory for a streaming job need to appear in it atomically. Otherwise, spark will process partially written files before you have finished

### Kafka source and sink
Apache kafka is a distributed publish-and-subscribe system for streams of data. Kafka lets you to publish and subscribe to streams of records like you might do with a message queue - these are stored in a fault-tolerant way. Think of kafka like a distributed buffer. Kafka lets you store streams of records in categories that are referred to as topics. Each record in kafka consists of a key, value, and a timestamp. Topics consist of immutable sequences of records for which the position of a record in a sequence is called an offset. Reading data is called subscribing to a topic and writing data is as simple as publishing to a topic
Spark allows you to read from kafka with both batch and streaming DataFrames

## Reading from the kafka source
To read, you first need to choose one of the following options: assign, subscribe, or subscribePattern. Only one of these can be present as an option when yoiu go to read from kafka. Assign is a fine-grained way of specifying not just the topic but also the topic partitions from which you would like to read. This is specified as a JSON string {"topicA"[0,1], "topicB": [2,4]}. Subscribe and subscribePattern are ways of subscribing to one or more topics either by specifying a list of topics (former) or via a pattern (latter).
Second, you need to specify the kafka.bootstrap.servers that kafka provides to connect with the service. After you have specified your options, you have several other options to specify:
* **startingOffsets and endingOffsets-** The start point when a query is started, either earliest, which is from the earliest offsets; latest which is from the latest offsets; or a json string specifying a starting offset for each TopicPartition
* **failOnDataLoss-** Whether to fail the query when it's possible that data is lost (e.g topics are deleted, or offsets are out of range). This might be a false alarm. You can disable it when it doesn't work as you expected
* **maxOffsetsPerTrigger-** The total number of offsets to read in a given trigger

In [37]:
# # subscribe to 1 topic
# df1 = spark.readStream.format("kafka")\
#   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
#   .option("subscribe", "topic1")\
#   .load()

# # subscribe to multiple topics
# df2 = spark.readStream.format('kafka')\
#             .option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
#             .option('subscribe', 'topic1, topic2')\
#             .load()

# # subscribe to a pattern
# df3 = spark.readStream.format('kafka')\
#             .option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
#             .option('subscribePattern', 'topic.*')\
#             .load()

Each row in the source will have the following schema:
* key:binary
* value:binary
* topic:string
* partition:int
* offset: long
* timestamp: long

## How Data is Output (Output Modes)
There are 3 modes supported by structured streaming:

### Append mode
This is the default behavior and the simplest to understand. When new rows are added to the result table, they will be output to the sink based on the trigger that you specify. This mode ensured that each row is output once (and only once), assuming that you have a fault-tolerant sink. When you use append mode with event-time and watermarks, only the final result will output to the sink

### Complete mode
This will output the entire state of the result table to your output sink. This is useful when you're working with some stateful data for which all rows are expected to change over time or the sink you are writing does not support row-level updates. 

### Update mode
This is similar to complete mode except that only the rows that are different from the previous ones are written out to the sink. Naturally, your sink must support row-level updates to support this mode


In [38]:
spark.stop()

22/11/09 01:24:45 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:596)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:582)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:442)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.