Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24699] [SS]Make watermarks work with Trigger.Once by saving updated watermark to commit log #21746

Closed
wants to merge 3 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Jul 11, 2018

What changes were proposed in this pull request?

Streaming queries with watermarks do not work with Trigger.Once because of the following.

  • Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned
  • In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere.

The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log.

How was this patch tested?

new unit tests

Co-authored-by: Tathagata Das tathagata.das1565@gmail.com
Co-authored-by: c-horn chorn4033@gmail.com

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: c-horn
@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92852 has finished for PR 21746 at commit 7e54a89.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CommitMetadata(nextBatchWatermarkMs: Long = 0)

@tdas tdas changed the title [WIP][SPARK-24699] [SS]Make watermarks work with Trigger.Once by saving updated watermark to commit log [SPARK-24699] [SS]Make watermarks work with Trigger.Once by saving updated watermark to commit log Jul 19, 2018
@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93301 has finished for PR 21746 at commit 10b5c2f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93300 has finished for PR 21746 at commit 584c96e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Jul 20, 2018

@zsxwing

@zsxwing
Copy link
Member

zsxwing commented Jul 20, 2018

LGTM

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@asfgit asfgit closed this in 61f0ca4 Jul 23, 2018
@c-horn
Copy link
Contributor

c-horn commented Aug 10, 2018

@tdas this will not be included in 2.4.0? as indicated SPARK-24699?

@zsxwing
Copy link
Member

zsxwing commented Aug 10, 2018

@c-horn it's in 2.4.0. I just fixed the ticket.

@c-horn
Copy link
Contributor

c-horn commented Aug 13, 2018

Thanks!

@clerxiris
Copy link

I have the same problem in PySpark using Spark 2.4.0, that is: Streaming queries with watermarks do not work with .trigger(once=True). When can we expect the same fix for PySpark?

@c-horn
Copy link
Contributor

c-horn commented Jan 13, 2019

I believe the current fix only fully processes the on-hand data when window aggregating when you run the Trigger.Once twice. It shouldn't matter that you are using pyspark, it is the same streaming code.

My initial test case was altered such that only the watermark is updated in the first pass, the second pass will process the data that fell before the watermark.

@clerxiris
Copy link

Can you provide me with a simple example of the current fix please?

@smrosenberry
Copy link

fully processes the on-hand data when window aggregating when you run the Trigger.Once twice

Can someone expound on this comment? Specifically, how do I programmatically "run the Trigger.once twice"?

In its simplest form, I tried this without success:

Dataset dataset;  // input read from a file
DataStreamWriter writer = dataset.writeStream().trigger( Trigger.Once() );

StreamingQuery query = writer.start();
query.awaitTermination();

query = writer.start();
query.awaitTermination();

query.stop();

Any hints would be greatly appreciated...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants