Skip to content

Commit

Permalink
[SPARK-21293][SS][SPARKR] Add doc example for streaming join, dedup
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

streaming programming guide changes

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #20340 from felixcheung/rstreamdoc.

(cherry picked from commit 2239d7a)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
  • Loading branch information
felixcheung authored and Felix Cheung committed Jan 21, 2018
1 parent e0ef30f commit 7520491
Showing 1 changed file with 72 additions and 2 deletions.
74 changes: 72 additions & 2 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,21 @@ streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
{% endhighlight %}

</div>

<div data-lang="r" markdown="1">

{% highlight r %}
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE) # inner equi-join with a static DF
joined <- join(
staticDf,
streamingDf,
streamingDf$value == staticDf$value,
"right_outer") # right outer join with a static DF
{% endhighlight %}

</div>
</div>

Expand Down Expand Up @@ -1227,6 +1242,30 @@ impressionsWithWatermark.join(

{% endhighlight %}

</div>
<div data-lang="r" markdown="1">

{% highlight r %}
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
impressionsWithWatermark,
clicksWithWatermark,
expr(
paste(
"clickAdId = impressionAdId AND",
"clickTime >= impressionTime AND",
"clickTime <= impressionTime + interval 1 hour"
)))

{% endhighlight %}

</div>
</div>

Expand Down Expand Up @@ -1287,6 +1326,23 @@ impressionsWithWatermark.join(

{% endhighlight %}

</div>
<div data-lang="r" markdown="1">

{% highlight r %}
joined <- join(
impressionsWithWatermark,
clicksWithWatermark,
expr(
paste(
"clickAdId = impressionAdId AND",
"clickTime >= impressionTime AND",
"clickTime <= impressionTime + interval 1 hour"),
"left_outer" # can be "inner", "left_outer", "right_outer"
))

{% endhighlight %}

</div>
</div>

Expand Down Expand Up @@ -1441,15 +1497,29 @@ streamingDf
{% highlight python %}
streamingDf = spark.readStream. ...

// Without watermark using guid column
# Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
# With watermark using guid and eventTime columns
streamingDf \
.withWatermark("eventTime", "10 seconds") \
.dropDuplicates("guid", "eventTime")
{% endhighlight %}

</div>
<div data-lang="r" markdown="1">

{% highlight r %}
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
{% endhighlight %}

</div>
</div>

Expand Down

0 comments on commit 7520491

Please sign in to comment.