Skip to content

Commit

Permalink
[SPARK-36172][SS] Document session window into Structured Streaming g…
Browse files Browse the repository at this point in the history
…uide doc

### What changes were proposed in this pull request?

This PR documents a new feature "native support of session window" into Structured Streaming guide doc.

Screenshots are following:

![스크린샷 2021-07-20 오후 5 04 20](https://user-images.githubusercontent.com/1317309/126284848-526ec056-1028-4a70-a1f4-ae275d4b5437.png)

![스크린샷 2021-07-20 오후 3 34 38](https://user-images.githubusercontent.com/1317309/126276763-763cf841-aef7-412a-aa03-d93273f0c850.png)

### Why are the changes needed?

This change is needed to explain a new feature to the end users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Documentation changes.

Closes #33433 from HeartSaVioR/SPARK-36172.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR committed Jul 21, 2021
1 parent 376fadc commit 0eb31a0
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 0 deletions.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/structured-streaming.pptx
Binary file not shown.
71 changes: 71 additions & 0 deletions docs/structured-streaming-programming-guide.md
Expand Up @@ -1063,6 +1063,77 @@ then drops intermediate state of a window < watermark, and appends the final
counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is
appended to the Result Table only after the watermark is updated to `12:11`.

#### Types of time windows

Spark supports three types of time windows: tumbling (fixed), sliding and session.

![The types of time windows](img/structured-streaming-time-window-types.jpg)

Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. An input
can only be bound to a single window.

Sliding windows are similar to the tumbling windows from the point of being "fixed-sized", but windows
can overlap if the duration of slide is smaller than the duration of window, and in this case an input
can be bound to the multiple windows.

Tumbling and sliding window use `window` function, which has been described on above examples.

Session windows have different characteristic compared to the previous two types. Session window has a dynamic size
of the window length, depending on the inputs. A session window starts with an input, and expands itself
if following input has been received within gap duration. A session window closes when there's no input
received within gap duration after receiving the latest input.

Session window uses `session_window` function. The usage of the function is similar to the `window` function.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window($"timestamp", "5 minutes"),
$"userId")
.count()
{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(col("timestamp"), "5 minutes"),
col("userId"))
.count();
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">
{% highlight python %}
events = ... # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(events.timestamp, "5 minutes"),
events.userId) \
.count()
{% endhighlight %}

</div>
</div>

##### Conditions for watermarking to clean aggregation state
{:.no_toc}

Expand Down

0 comments on commit 0eb31a0

Please sign in to comment.