Skip to content
Permalink
Browse files

[SPARK-23064][DOCS][SS] Added documentation for stream-stream joins

## What changes were proposed in this pull request?
Added documentation for stream-stream joins

![image](https://user-images.githubusercontent.com/663212/35018744-e999895a-fad7-11e7-9d6a-8c7a73e6eb9c.png)

![image](https://user-images.githubusercontent.com/663212/35018775-157eb464-fad8-11e7-879e-47a2fcbd8690.png)

![image](https://user-images.githubusercontent.com/663212/35018784-27791a24-fad8-11e7-98f4-7ff246f62a74.png)

![image](https://user-images.githubusercontent.com/663212/35018791-36a80334-fad8-11e7-9791-f85efa7c6ba2.png)

## How was this patch tested?

N/a

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20255 from tdas/join-docs.

(cherry picked from commit 1002bd6)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information...
tdas authored and zsxwing committed Jan 18, 2018
1 parent 9783aea commit 050c1e24e506ff224bcf4e3e458e57fbd216765c
Showing with 326 additions and 12 deletions.
  1. +326 −12 docs/structured-streaming-programming-guide.md
@@ -1051,7 +1051,19 @@ output mode.


### Join Operations
Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples.
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
as well as another streaming Dataset/DataFrame. The result of the streaming join is generated
incrementally, similar to the results of streaming aggregations in the previous section. In this
section we will explore what type of joins (i.e. inner, outer, etc.) are supported in the above
cases. Note that in all the supported join types, the result of the join with a streaming
Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame
containing the same data in the stream.


#### Stream-static joins

Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some
type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.

<div class="codetabs">
<div data-lang="scala" markdown="1">
@@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
</div>
</div>

Note that stream-static joins are not stateful, so no state management is necessary.
However, a few types of stream-static outer joins are not yet supported.
These are listed at the [end of this Join section](#support-matrix-for-joins-in-streaming-queries).

#### Stream-stream Joins
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
Datasets/DataFrames. The challenge of generating join results between two data streams is that,
at any point of time, the view of the dataset is incomplete for both sides of the join making
it much harder to find matches between inputs. Any row received from one input stream can match
with any future, yet-to-be-received row from the other input stream. Hence, for both the input
streams, we buffer past input as streaming state, so that we can match every future input with
past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
we automatically handle late, out-of-order data and can limit the state using watermarks.
Let’s discuss the different types of supported stream-stream joins and how to use them.

##### Inner Joins with optional Watermarking
Inner joins on any kind of columns along with any kind of join conditions are supported.
However, as the stream runs, the size of streaming state will keep growing indefinitely as
*all* past input must be saved as the any new input can match with any input from the past.
To avoid unbounded state, you have to define additional join conditions such that indefinitely
old inputs cannot match with future inputs and therefore can be cleared from the state.
In other words, you will have to do the following additional steps in the join.

1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
(similar to streaming aggregations)

1. Define a constraint on event-time across the two inputs such that the engine can figure out when
old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for
matches with the other input. This constraint can be defined in one of the two ways.

1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),

1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).

Let’s understand this with an example.

Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
another stream of user clicks on advertisements to correlate when impressions led to
monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
specify the watermarking delays and the time constraints as follows.

1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
in event-time by at most 2 and 3 hours, respectively.

1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
after the corresponding impression.

The code would look like this.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)

{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour ")
);

{% endhighlight %}


</div>
<div data-lang="python" markdown="1">

{% highlight python %}
from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)

{% endhighlight %}

</div>
</div>

##### Outer Joins with Watermarking
While the watermark + event-time constraints is optional for inner joins, for left and right outer
joins they must be specified. This is because for generating the NULL results in outer join, the
engine must know when an input row is not going to match with anything in future. Hence, the
watermark + event-time constraints must be specified for generating correct results. Therefore,
a query with outer-join will look quite like the ad-monetization example earlier, except that
there will be an additional parameter specifying it to be an outer-join.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}

impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
)

{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // can be "inner", "leftOuter", "rightOuter"
);

{% endhighlight %}


</div>
<div data-lang="python" markdown="1">

{% highlight python %}
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter"
)

{% endhighlight %}

</div>
</div>

However, note that the outer NULL results will be generated with a delay (depends on the specified
watermark delay and the time range condition) because the engine has to wait for that long to ensure
there were no matches and there will be no more matches in future.

##### Support matrix for joins in streaming queries

<table class ="table">
<tr>
<th>Left Input</th>
<th>Right Input</th>
<th>Join Type</th>
<th></th>
</tr>
<tr>
<td style="vertical-align: middle;">Static</td>
<td style="vertical-align: middle;">Static</td>
<td style="vertical-align: middle;">All types</td>
<td style="vertical-align: middle;">
Supported, since its not on streaming data even though it
can be present in a streaming query
</td>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td rowspan="4" style="vertical-align: middle;">Static</td>
<td style="vertical-align: middle;">Inner</td>
<td style="vertical-align: middle;">Supported, not stateful</td>
</tr>
<tr>
<td style="vertical-align: middle;">Left Outer</td>
<td style="vertical-align: middle;">Supported, not stateful</td>
</tr>
<tr>
<td style="vertical-align: middle;">Right Outer</td>
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td style="vertical-align: middle;">Full Outer</td>
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">Static</td>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td style="vertical-align: middle;">Inner</td>
<td style="vertical-align: middle;">Supported, not stateful</td>
</tr>
<tr>
<td style="vertical-align: middle;">Left Outer</td>
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td style="vertical-align: middle;">Right Outer</td>
<td style="vertical-align: middle;">Supported, not stateful</td>
</tr>
<tr>
<td style="vertical-align: middle;">Full Outer</td>
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td style="vertical-align: middle;">Inner</td>
<td style="vertical-align: middle;">
Supported, optionally specify watermark on both sides +
time constraints for state cleanup
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Left Outer</td>
<td style="vertical-align: middle;">
Conditionally supported, must specify watermark on right + time constraints for correct
results, optionally specify watermark on left for all state cleanup
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Right Outer</td>
<td style="vertical-align: middle;">
Conditionally supported, must specify watermark on left + time constraints for correct
results, optionally specify watermark on right for all state cleanup
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Full Outer</td>
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
</table>

Additional details on supported joins:

- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, ...).join(df4, ....)`.

- As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

- As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of
what cannot be used.

- Cannot use streaming aggregations before joins.

- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.


### Streaming Deduplication
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.

@@ -1160,15 +1466,9 @@ Some of them are as follows.

- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

- Outer joins between a streaming and a static Datasets are conditionally supported.

+ Full outer join with a streaming Dataset is not supported

+ Left outer join with a streaming Dataset on the right is not supported

+ Right outer join with a streaming Dataset on the left is not supported

- Any kind of joins between two streaming Datasets is not yet supported.
- Few types of outer joins on streaming Datasets are not supported. See the
<a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

@@ -1276,6 +1576,15 @@ Here is the compatibility matrix.
Aggregations not allowed after <code>flatMapGroupsWithState</code>.
</td>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Queries with <code>joins</code></td>
<td style="vertical-align: middle;">Append</td>
<td style="vertical-align: middle;">
Update and Complete mode not supported yet. See the
<a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
for more details on what types of joins are supported.
</td>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Other queries</td>
<td style="vertical-align: middle;">Append, Update</td>
@@ -2142,6 +2451,11 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat

**Talks**

- Spark Summit 2017 Talk - [Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/)
- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
- Spark Summit Europe 2017
- Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark -
[Part 1 slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark), [Part 2 slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark-continues)
- Deep Dive into Stateful Stream Processing in Structured Streaming - [slides/video](https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
- Spark Summit 2016
- A Deep Dive into Structured Streaming - [slides/video](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)


0 comments on commit 050c1e2

Please sign in to comment.
You can’t perform that action at this time.