In [4]:
%run "./Includes/Classroom-Setup"

<h2>Reading a Stream</h2>

The method `SparkSession.readStream` returns a `DataStreamReader` used to configure the stream.

There are a number of key points to the configuration of a `DataStreamReader`:
* The schema
* The type of stream: Files, Kafka, TCP/IP, etc
* Configuration specific to the type of stream
  * For files, the file type, the path to the files, max files, etc...
  * For TCP/IP the server's address, port number, etc...
  * For Kafka the server's address, port, topics, partitions, etc...

### The Schema

Every streaming DataFrame must have a schema - the definition of column names and data types.

Some sources such as Kafka define the schema for you.

In file-based streaming sources, for example, the schema is user-defined.

### Why must a schema be specified for a streaming DataFrame?

To say that another way... 

### Why are streaming DataFrames unable to infer/read a schema?

In [10]:
# Define the schema using a DDL-formatted string.
dataSchema = "Recorded_At timestamp, Device string, Index long, Model string, User string, _corrupt_record String, gt string, x double, y double, z double"

### Configuring a File Stream

In our example below, we will be consuming files written continuously to a pre-defined directory. 

To control how much data is pulled into Spark at once, we can specify the option `maxFilesPerTrigger`.

In our example below, we will be reading in only one file for every trigger interval:

`dsr.option("maxFilesPerTrigger", 1)`

Both the location and file type are specified with the following call, which itself returns a `DataFrame`:

`df = dsr.json(dataPath)`

In [12]:
dataPath = "dbfs:/mnt/training/definitive-guide/data/activity-data-stream.json"
initialDF = (spark
  .readStream                            # Returns DataStreamReader
  .option("maxFilesPerTrigger", 1)       # Force processing of only 1 file per trigger 
  .schema(dataSchema)                    # Required for all streaming DataFrames
  .json(dataPath)                        # The stream's source directory and file type
)

And with the initial `DataFrame`, we can apply some transformations:

In [14]:
streamingDF = (initialDF
  .withColumnRenamed("Index", "User_ID")  # Pick a "better" column name
  .drop("_corrupt_record")                # Remove an unnecessary column
)

### Streaming DataFrames

Other than the call to `spark.readStream`, it looks just like any other `DataFrame`.

But is it a "streaming" `DataFrame`?

You can differentiate between a "static" and "streaming" `DataFrame` with the following call:

In [16]:
# Static vs Streaming?
streamingDF.isStreaming

### Unsupported Operations

Most operations on a "streaming" DataFrame are identical to a "static" DataFrame.

There are some exceptions to this.

Some operations that are not supported by streaming are
* Sorting our never-ending stream by `Recorded_At`.
* Aggregating our stream by some criterion.

We will see in the following module how we can solve this problem.

In [18]:
from pyspark.sql.functions import col

try:
  sortedDF = streamingDF.orderBy(col("Recorded_At").desc())
  display(sortedDF)
except:
  print("Sorting is not supported on an unaggregated stream")

<h2>Writing a Stream</h2>

The method `DataFrame.writeStream` returns a `DataStreamWriter` used to configure the output of the stream.

There are a number of parameters to the `DataStreamWriter` configuration:
* Query's name (optional) - This name must be unique among all the currently active queries in the associated SQLContext.
* Trigger (optional) - Default value is `ProcessingTime(0`) and it will run the query as fast as possible.
* Checkpointing directory (optional)
* Output mode
* Output sink
* Configuration specific to the output sink, such as:
  * The host, port and topic of the receiving Kafka server
  * The file format and final destination of files
  * A custom sink via `dsw.foreach(...)`

Once the configuration is completed, we can trigger the job with a call to `dsw.start()`

### Triggers

The trigger specifies when the system should process the next set of data.

| Trigger Type                           | Example | Notes |
|----------------------------------------|-----------|-------------|
| Unspecified                            |  | _DEFAULT_- The query will be executed as soon as the system has completed processing the previous query |
| Fixed interval micro-batches           | `dsw.trigger(Trigger.ProcessingTime("6 hours"))` | The query will be executed in micro-batches and kicked off at the user-specified intervals |
| One-time micro-batch                   | `dsw.trigger(Trigger.Once())` | The query will execute _only one_ micro-batch to process all the available data and then stop on its own |
| Continuous w/fixed checkpoint interval | `dsw.trigger(Trigger.Continuous("1 second"))` | The query will be executed in a low-latency, continuous processing mode. _EXPERIMENTAL_ in 2.3.2 |

In the example below, you will be using a fixed interval of 3 seconds:

`dsw.trigger(Trigger.ProcessingTime("3 seconds"))`

### Checkpointing

A <b>checkpoint</b> stores the current state of your streaming job to a reliable storage system such as Amazon S3or HDFS. It does not store the state of your streaming job to the local file system of any node in your cluster. 

Together with write ahead logs, a terminated stream can be restarted and it will continue from where it left off.

To enable this feature, you only need to specify the location of a checkpoint directory:

`dsw.option("checkpointLocation", checkpointPath)`

Points to consider:
* If you do not have a checkpoint directory, when the streaming job stops, you lose all state around your streaming job and upon restart, you start from scratch.
* For some sinks, you will get an error if you do not specify a checkpoint directory:<br/>
`analysisException: 'checkpointLocation must be specified either through option("checkpointLocation", ...)..`
* Also note that every streaming job should have its own checkpoint directory: no sharing.

### Output Modes

| Mode   | Example | Notes |
| ------------- | ----------- |
| **Complete** | `dsw.outputMode("complete")` | The entire updated Result Table is written to the sink. The individual sink implementation decides how to handle writing the entire table. |
| **Append** | `dsw.outputMode("append")`     | Only the new rows appended to the Result Table since the last trigger are written to the sink. |
| **Update** | `dsw.outputMode("update")`     | Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. Since Spark 2.1.1 |

In the example below, we are writing to a Parquet directory which only supports the `append` mode: 

`dsw.outputMode("append")`

### Output Sinks

`DataStreamWriter.format` accepts the following values, among others:

| Output Sink | Example                                          | Notes |
| ----------- | ------------------------------------------------ | ----- |
| **File**    | `dsw.format("parquet")`, `dsw.format("csv")`...  | Dumps the Result Table to a file. Supports Parquet, json, csv, etc.|
| **Kafka**   | `dsw.format("kafka")`      | Writes the output to one or more topics in Kafka |
| **Console** | `dsw.format("console")`    | Prints data to the console (useful for debugging) |
| **Memory**  | `dsw.format("memory")`     | Updates an in-memory table, which can be queried through Spark SQL or the DataFrame API |
| **foreach** | `dsw.foreach(writer: ForeachWriter)` | This is your "escape hatch", allowing you to write your own type of sink. |
| **Delta**    | `dsw.format("delta")`     | A proprietary sink |

In the example below, we will be appending files to a Parquet file and specifying its location with this call:

`dsw.format("parquet").start(outputPathDir)`

<h2>Let's Do Some Streaming</h2>

In the cell below, we write data from a streaming query to `outputPathDir`. 

There are a couple of things to note below:
0. We are giving the query a name via the call to `queryName` . We can use this later to reference the query by name.
0. Spark begins running jobs once we call `start`. This is equivalent to calling an action on a "static" DataFrame.
0. The call to `start` returns a `StreamingQuery` object. We can use this to interact with the running query.

In [27]:
outputPathDir = workingDir + "/output.parquet" # A subdirectory for our output
checkpointPath = workingDir + "/checkpoint"    # A subdirectory for our checkpoint & W-A logs
myStreamName = "lesson02_ps"                   # An arbitrary name for the stream

In [28]:
streamingQuery = (streamingDF                   # Start with our "streaming" DataFrame
  .writeStream                                  # Get the DataStreamWriter
  .queryName(myStreamName)                      # Name the query
  .trigger(processingTime="3 seconds")          # Configure for a 3-second micro-batch
  .format("parquet")                            # Specify the sink type, a Parquet file
  .option("checkpointLocation", checkpointPath) # Specify the location of checkpoint files & W-A logs
  .outputMode("append")                         # Write only new data to the "file"
  .start(outputPathDir)                         # Start the job, writing to the specified directory
)

In [29]:
# Wait until stream is done initializing...
# This method will be explained shortly.
untilStreamIsReady(myStreamName)

<h2>Managing Streaming Queries</h2>

When a query is started, the `StreamingQuery` object can be used to monitor and manage the query.

| Method    |  Description |
| ----------- | ------------------------------- |
|`id`| get unique identifier of the running query that persists across restarts from checkpoint data |
|`runId`| get unique id of this run of the query, which will be generated at every start/restart |
|`name`| get name of the auto-generated or user-specified name |
|`explain()`| print detailed explanations of the query |
|`stop()`| stop query |
|`awaitTermination()`| block until query is terminated, with stop() or with error |
|`exception`| exception if query terminated with error |
|`recentProgress`| array of most recent progress updates for this query |
|`lastProgress`| most recent progress update of this streaming query |

In [31]:
streamingQuery.recentProgress

Additionally, we can iterate over a list of active streams:

In [33]:
for s in spark.streams.active:         # Iterate over all streams
  print("{}: {}".format(s.id, s.name)) # Print the stream's id and name

The code below introduces **`awaitTermination()`**

**`awaitTermination()`** will block the current thread
* Until the stream stops naturally or 
* Until the specified timeout elapses (if specified)

If the stream was "canceled" or otherwise terminated abnormally, any resulting exceptions will be thrown by **`awaitTermination()`** as well.

In [35]:
try:
  
  # Stream for up to 10 seconds while the current thread blocks  
  streamingQuery.awaitTermination(10)  
  
except Exception as e:
  print(e)

Wait for it... And once the 10 seconds have elapsed without any error, we can explictly stop the stream.

In [37]:
try:
  
  # Issue the command to stop the stream
  streamingQuery.stop()

except Exception:
  print(e)

When working with streams, we are in reality, working with a separate "thread". 

As a result, different exceptions may arise as streams are terminated and/or queried.

For this reason, we have developed a number of utility methods to help with these operations:
* **`untilStreamIsReady(name)`** to wait until a stream is fully initialized before resuming execution.
* **`stopAllStreams()`** to stop all active streams in a fail-safe manner.

The implementation of each of these can be found in the notebook **`./Includes/Common-Notebooks/Utility-Methods`**

<h2>The Display function</h2>

Within the Databricks notebooks, we can use the `display()` function to render a live plot

#### One Gotcha!
When you pass a "streaming" `DataFrame` to `display`:
* A "memory" sink is being used
* The output mode is complete
* The query name is specified with the `streamName` parameter
* The trigger is specified with the `trigger` parameter
* The checkpointing location is specified with the `checkpointLocation`

`display(myDF, streamName = "myQuery")`

We just programmatically stopped our only streaming query in the previous cell. In the cell below, `display` will automatically start our streaming DataFrame, `streamingDF`.  We are passing `stream_2p` as the name for this newly started stream.

In [40]:
display(streamingDF, streamName = myStreamName)

Recorded_At,Device,User_ID,Model,User,gt,x,y,z
2015-02-23T10:18:54.958+0000,nexus4_2,0,nexus4,g,stand,-0.0003814697,0.03656006,0.030136108
2015-02-23T10:18:54.964+0000,nexus4_2,1,nexus4,g,stand,-0.001449585,0.035491943,0.027999878
2015-02-23T10:18:54.968+0000,nexus4_2,2,nexus4,g,stand,0.0006866455,0.033355713,0.030136108
2015-02-23T10:18:54.976+0000,nexus4_2,3,nexus4,g,stand,0.0006866455,0.033355713,0.022659302
2015-02-23T10:18:54.978+0000,nexus4_2,4,nexus4,g,stand,-0.0003814697,0.030151367,0.025863647
2015-02-23T10:18:54.992+0000,nexus4_2,5,nexus4,g,stand,-0.0003814697,0.025878906,0.023727417
2015-02-23T10:18:54.992+0000,nexus4_2,6,nexus4,g,stand,-0.001449585,0.02267456,0.023727417
2015-02-23T10:18:54.996+0000,nexus4_2,7,nexus4,g,stand,-0.0003814697,0.019470215,0.024795532
2015-02-23T10:18:54.999+0000,nexus4_2,8,nexus4,g,stand,0.0006866455,0.01626587,0.021591187
2015-02-23T10:18:55.000+0000,nexus4_1,0,nexus4,g,stand,0.80996704,-0.34129333,-0.1672821


In [41]:
# Wait until stream is done initializing...
untilStreamIsReady(myStreamName)

Using the value passed to `streamName` in the call to `display`, we can programatically access this specific stream:

In [43]:
print("Looking for {}".format(myStreamName))

for stream in spark.streams.active:      # Loop over all active streams
  if stream.name == myStreamName:        # Single out "streamWithTimestamp"
    print("Found {} ({})".format(stream.name, stream.id)) 

Stop all remaining streams.

In [45]:
stopAllStreams()

## End-to-end Fault Tolerance

Structured Streaming ensures end-to-end exactly-once fault-tolerance guarantees through _checkpointing_ and <a href="https://en.wikipedia.org/wiki/Write-ahead_logging" target="_blank">Write Ahead Logs</a>.

Structured Streaming sources, sinks, and the underlying execution engine work together to track the progress of stream processing. If a failure occurs, the streaming engine attempts to restart and/or reprocess the data.

This approach _only_ works if the streaming source is replayable. To ensure fault-tolerance, Structured Streaming assumes that every streaming source has offsets, akin to:

* <a target="_blank" href="https://kafka.apache.org/documentation/#intro_topics">Kafka message offsets</a>
* <a target="_blank" href="http://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#sequence-number">Kinesis sequence numbers</a>

At a high level, the underlying streaming mechanism relies on a couple approaches:

* First, Structured Streaming uses checkpointing and write-ahead logs to record the offset range of data being processed during each trigger interval.
* Next, the streaming sinks are designed to be _idempotent_—that is, multiple writes of the same data (as identified by the offset) do _not_ result in duplicates being written to the sink.

Taken together, replayable data sources and idempotent sinks allow Structured Streaming to ensure **end-to-end, exactly-once semantics** under any failure condition.

<h2>Review Questions</h2>

**Q:** What do `readStream` and `writeStream` do?<br>
**A:** `readStream` creates a streaming DataFrame.<br>`writeStream` sends streaming data to a directory or other type of output sink.

**Q:** What does `display` output if it is applied to a DataFrame created via `readStream`?<br>
**A:** `display` sends streaming data to a LIVE graph!

**Q:** When you do a write stream command, what does this option do `outputMode("append")` ?<br>
**A:** This option takes on the following values and their respective meanings:
* <b>append</b>: add only new records to output sink
* <b>complete</b>: rewrite full output - applicable to aggregations operations
* <b>update</b>: update changed records in place

**Q:** What happens if you do not specify `option("checkpointLocation", pointer-to-checkpoint directory)`?<br>
**A:** When the streaming job stops, you lose all state around your streaming job and upon restart, you start from scratch.

**Q:** How do you view the list of active streams?<br>
**A:** Invoke `spark.streams.active`.

**Q:** How do you verify whether `streamingQuery` is running (boolean output)?<br>
**A:** Invoke `spark.streams.get(streamingQuery.id).isActive`.

In [50]:
%run ./Includes/Classroom-Cleanup