Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24699][SS][WIP] Watermark / Append mode should work with Trigger.Once #21676

Closed
wants to merge 7 commits into from

Conversation

c-horn
Copy link
Contributor

@c-horn c-horn commented Jun 29, 2018

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-24699

Structured streaming using Trigger.Once does not persist watermark state between batches, causing streams to never yield output. I will attach some scripts that reproduce this behavior in the Jira issue.

It seems like the microbatcher only calculates the watermark off of the previous batch's input and emits new aggs based off of that timestamp. I believe the issue here is that the previous batch state is not persisted to the checkpoint, and therefore cannot be used when the stream is started again with Trigger.Once.

This behavior can be seen when restarting a normal stream from checkpoint, output is never generated on the first batch.

I will investigate ways of fixing this but I am definitely interested in input from anyone who worked on SS.

My assumption is that the watermarking should update with at least the batch-to-batch latency that it does under microbatch/Trigger.ProcessingTime.

How was this patch tested?

Failing unit test provided.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@c-horn c-horn changed the title [SPARK-24699][SQL][WIP] Watermark / Append mode should work with Trigger.Once [SPARK-24699][SS][WIP] Watermark / Append mode should work with Trigger.Once Jun 30, 2018
@c-horn
Copy link
Contributor Author

c-horn commented Jun 30, 2018

@tdas @marmbrus

@c-horn
Copy link
Contributor Author

c-horn commented Jul 2, 2018

Changing OneTimeExecutor like this resolves the test case:

case class OneTimeExecutor() extends TriggerExecutor {

  /**
   * Execute a single batch using `batchRunner`.
   */
-  override def execute(batchRunner: () => Boolean): Unit = batchRunner()
+  override def execute(batchRunner: () => Boolean): Unit = batchRunner() && batchRunner()
}

... but the type becomes semantically incorrect.

Is this an acceptable solution? it appears that a lot of the MicroBatchExecution code makes assumptions about state from the previous batch, which may or may not be realized in the first iteration of a stream restart.

@tdas
Copy link
Contributor

tdas commented Jul 5, 2018

I think the right solution is to record the updated watermark in the commit log so that the updated watermark can be read back from the commit log next time the stream is started. Right now, there is no information written in the commit log, only the existence of the commit file is used as a proof that the batch as completed. will add a new field in the json written out to a commit file which will store the updated watermark. And this should be done in a back-compatible way such that old checkpoints that do not have the new field can recover as well.

@tdas
Copy link
Contributor

tdas commented Jul 11, 2018

The offset log contains the watermark value that is going to be used in the batch corresponding to that offset. For example, "checkpoint/offsets/10" will contain the watermark value to be used for batch 10. The problem is that when batch 10 completes and new watermark values is computed, it is not saved in a persistent location until batch 11 is planned and "offsets/11" is written out. In trigger.once, this never happens as the query is terminated as soon as batch 10 completes. So the new watermark value is not saved. If the query running in trigger.once mode right from the beginning, that is batch 0, then no new watermark value is ever written, and so the watermark shows up always as 0.

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: c-horn
@tdas
Copy link
Contributor

tdas commented Jul 11, 2018

Here is my solution based on my suggestion - #21746
I stole your unit test from this PR :) Thank you! I will add you as a co-author in that PR.

@c-horn
Copy link
Contributor Author

c-horn commented Jul 11, 2018

@tdas I merged your changes into my branch, test passed, thank you 👍

@tdas
Copy link
Contributor

tdas commented Jul 20, 2018

hey, @c-horn , I am ready to merge my PR #21746 (I added more tests) and to add you as a coauthor, I think I need to know your email address associated with your github account. Can you provide me that?

@tdas
Copy link
Contributor

tdas commented Jul 23, 2018

ping ^^^

@c-horn
Copy link
Contributor Author

c-horn commented Jul 23, 2018

Hi @tdas sorry for delay.
My email for github account: chorn4033@gmail.com

This looks fine to me, we can close this PR (and jira ticket) when yours is merged.

@c-horn
Copy link
Contributor Author

c-horn commented Jul 23, 2018

already resolved by #21746

@c-horn c-horn closed this Jul 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants