Skip to content
Permalink
Browse files

[SPARK-23454][SS][DOCS] Added trigger information to the Structured S…

…treaming programming guide

## What changes were proposed in this pull request?

- Added clear information about triggers
- Made the semantics guarantees of watermarks more clear for streaming aggregations and stream-stream joins.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20631 from tdas/SPARK-23454.

(cherry picked from commit 601d653)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information...
tdas committed Feb 21, 2018
1 parent 1d78f03 commit 3e7269eb904b591883300d7433e5c99be0b3b5b3
Showing with 207 additions and 7 deletions.
  1. +207 −7 docs/structured-streaming-programming-guide.md
@@ -904,7 +904,7 @@ windowedCounts <- count(
</div>


### Handling Late Data and Watermarking
#### Handling Late Data and Watermarking
Now consider what happens if one of the events arrives late to the application.
For example, say, a word generated at 12:04 (i.e. event time) could be received by
the application at 12:11. The application should use the time 12:04 instead of 12:11
@@ -925,7 +925,9 @@ specifying the event time column and the threshold on how late the data is expec
event time. For a specific window starting at time `T`, the engine will maintain state and allow late
data to update the state until `(max event time seen by the engine - late threshold > T)`.
In other words, late data within the threshold will be aggregated,
but data later than the threshold will be dropped. Let's understand this with an example. We can
but data later than the threshold will start getting dropped
(see [later]((#semantic-guarantees-of-aggregation-with-watermarking))
in the section for the exact guarantees). Let's understand this with an example. We can
easily define watermarking on the previous example using `withWatermark()` as shown below.

<div class="codetabs">
@@ -1031,7 +1033,9 @@ then drops intermediate state of a window < watermark, and appends the final
counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is
appended to the Result Table only after the watermark is updated to `12:11`.

**Conditions for watermarking to clean aggregation state**
##### Conditions for watermarking to clean aggregation state
{:.no_toc}

It is important to note that the following conditions must be satisfied for the watermarking to
clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*.

@@ -1051,6 +1055,16 @@ from the aggregation column.
For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append
output mode.

##### Semantic Guarantees of Aggregation with Watermarking
{:.no_toc}

- A watermark delay (set with `withWatermark`) of "2 hours" guarantees that the engine will never
drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind
(in terms of event-time) the latest data processed till then is guaranteed to be aggregated.

- However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is
not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less
likely is the engine going to process it.

### Join Operations
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
@@ -1062,7 +1076,7 @@ Dataset/DataFrame will be the exactly the same as if it was with a static Datase
containing the same data in the stream.


#### Stream-static joins
#### Stream-static Joins

Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some
type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
@@ -1269,6 +1283,12 @@ joined <- join(
</div>
</div>

###### Semantic Guarantees of Stream-stream Inner Joins with Watermarking
{:.no_toc}
This is similar to the [guarantees provided by watermarking on aggregations](#semantic-guarantees-of-aggregation-with-watermarking).
A watermark delay of "2 hours" guarantees that the engine will never drop any data that is less than
2 hours delayed. But data delayed by more than 2 hours may or may not get processed.

##### Outer Joins with Watermarking
While the watermark + event-time constraints is optional for inner joins, for left and right outer
joins they must be specified. This is because for generating the NULL results in outer join, the
@@ -1347,7 +1367,14 @@ joined <- join(
</div>


There are a few points to note regarding outer joins.
###### Semantic Guarantees of Stream-stream Outer Joins with Watermarking
{:.no_toc}
Outer joins have the same guarantees as [inner joins](#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking)
regarding watermark delays and whether data will be dropped or not.

###### Caveats
{:.no_toc}
There are a few important characteristics to note regarding how the outer results are generated.

- *The outer NULL results will be generated with a delay that depends on the specified watermark
delay and the time range condition.* This is because the engine has to wait for that long to ensure
@@ -1962,7 +1989,7 @@ head(sql("select * from aggregates"))
</div>
</div>

#### Using Foreach
##### Using Foreach
The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
@@ -1979,6 +2006,172 @@ which has methods that get called whenever there is a sequence of rows generated

- Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks.

#### Triggers
The trigger settings of a streaming query defines the timing of streaming data processing, whether
the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.
Here are the different kinds of triggers that are supported.

<table class="table">
<tr>
<th>Trigger Type</th>
<th>Description</th>
</tr>
<tr>
<td><i>unspecified (default)</i></td>
<td>
If no trigger setting is explicitly specified, then by default, the query will be
executed in micro-batch mode, where micro-batches will be generated as soon as
the previous micro-batch has completed processing.
</td>
</tr>
<tr>
<td><b>Fixed interval micro-batches</b></td>
<td>
The query will be executed with micro-batches mode, where micro-batches will be kicked off
at the user-specified intervals.
<ul>
<li>If the previous micro-batch completes within the interval, then the engine will wait until
the interval is over before kicking off the next micro-batch.</li>

<li>If the previous micro-batch takes longer than the interval to complete (i.e. if an
interval boundary is missed), then the next micro-batch will start as soon as the
previous one completes (i.e., it will not wait for the next interval boundary).</li>

<li>If no new data is available, then no micro-batch will be kicked off.</li>
</ul>
</td>
</tr>
<tr>
<td><b>One-time micro-batch</b></td>
<td>
The query will execute *only one* micro-batch to process all the available data and then
stop on its own. This is useful in scenarios you want to periodically spin up a cluster,
process everything that is available since the last period, and then shutdown the
cluster. In some case, this may lead to significant cost savings.
</td>
</tr>
<tr>
<td><b>Continuous with fixed checkpoint interval</b><br/><i>(experimental)</i></td>
<td>
The query will be executed in the new low-latency, continuous processing mode. Read more
about this in the <a href="#continuous-processing-experimental">Continuous Processing section</a> below.
</td>
</tr>
</table>

Here are a few code examples.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()

// One-time trigger
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()

{% endhighlight %}


</div>
<div data-lang="java" markdown="1">

{% highlight java %}
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start();

// One-time trigger
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start();

{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

{% highlight python %}

# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
.format("console") \
.start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()

# One-time trigger
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()

{% endhighlight %}
</div>
<div data-lang="r" markdown="1">

{% highlight r %}
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported
{% endhighlight %}
</div>
</div>


## Managing Streaming Queries
The `StreamingQuery` object created when a query is started can be used to monitor and manage the query.

@@ -2516,7 +2709,10 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
</div>
</div>
# Continuous Processing [Experimental]
# Continuous Processing
## [Experimental]
{:.no_toc}
**Continuous processing** is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default *micro-batch processing* engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations).

To run a supported query in continuous processing mode, all you need to do is specify a **continuous trigger** with the desired checkpoint interval as a parameter. For example,
@@ -2589,6 +2785,8 @@ spark \
A checkpoint interval of 1 second means that the continuous processing engine will records the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.

## Supported Queries
{:.no_toc}

As of Spark 2.3, only the following type of queries are supported in the continuous processing mode.

- *Operations*: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (`select`, `map`, `flatMap`, `mapPartitions`, etc.) and selections (`where`, `filter`, etc.).
@@ -2606,6 +2804,8 @@ As of Spark 2.3, only the following type of queries are supported in the continu
See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.

## Caveats
{:.no_toc}

- Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel. Therefore, before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel. For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress.
- Stopping a continuous processing stream may produce spurious task termination warnings. These can be safely ignored.
- There are currently no automatic retries of failed tasks. Any failure will lead to the query being stopped and it needs to be manually restarted from the checkpoint.

0 comments on commit 3e7269e

Please sign in to comment.
You can’t perform that action at this time.