Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter #25407

Closed
wants to merge 6 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Aug 11, 2019

What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for (partitionId, epochId). Refer the description of SPARK-28650 for more details.

Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with epochId, though it will be harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given epochId 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch is small.

Credit to @zsxwing on discovering the broken guarantee.

How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

@SparkQA
Copy link

SparkQA commented Aug 11, 2019

Test build #108929 has finished for PR 25407 at commit 5ac50cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* continuous mode, then this guarantee does not hold and therefore should not be used for
* deduplication.
* <li>Spark doesn't guarantee same output for (partitionId, epochId) on failure, so deduplication
* cannot be achieved with (partitionId, epochId). Refer SPARK-28650 for more details.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-28650 has only the following content, could we remove this Refer SPARK-28650 for more details by embedding this information?

But we can break this easily actually when restarting a query but a batch is re-run (e.g., upgrade Spark)
Source returns a different DataFrame that has a different partition number (e.g., we start to not create empty partitions in Kafka Source V2).
A new added optimization rule may change the number of partitions in the new run.
Change the file split size in the new run.
Since we cannot guarantee that the same (partitionId, epochId) has the same data. We should update the document for "ForeachWriter".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd inline a few of those examples of when it can occur, and leave it at that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we would want to warn for existing ForeachWriter users (this can't be done in javadoc - it should be noticed via release note, streaming doc guide - maybe?, etc), not all end users would like to know the details. (I'd expect there would be more users ignoring the details than trying to understand all the details.) As they could still get the details from visiting the issue SPARK-28650, I'd borrow some example as e.g. from there and leave reference.

@dongjoon-hyun
Copy link
Member

Could you review and sign-off this please, @zsxwing ?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sounds like the docs need an update. Is there any place in the user docs that this kind of gotcha should be documented? Or is it too niche?

* continuous mode, then this guarantee does not hold and therefore should not be used for
* deduplication.
* <li>Spark doesn't guarantee same output for (partitionId, epochId) on failure, so deduplication
* cannot be achieved with (partitionId, epochId). Refer SPARK-28650 for more details.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd inline a few of those examples of when it can occur, and leave it at that.

* <li>Spark doesn't guarantee same output for (partitionId, epochId) on failure, so deduplication
* cannot be achieved with (partitionId, epochId). Refer SPARK-28650 for more details.
*
* You can still apply deduplication on `epochId`, but there's less benefit to leverage this,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd keep the voice consistent. "epochId can still be used for deduplication .." instead of "you can ..."
Can we clarify if there are cases where deduplication on both is still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd keep the voice consistent. "epochId can still be used for deduplication .." instead of "you can ..."

I'll do the update.

Can we clarify if there are cases where deduplication on both is still valid?

As you could see the cases in SPARK-28650, Spark and source can break this guarantee which end users would be not easy to determine. Guarantee can still be broken even end users don't change the query, so I'd rather not enumerating the cases and let end users encounter the odd cases. What do you think about this @zsxwing ?

@HeartSaVioR
Copy link
Contributor Author

Yes, sounds like the docs need an update. Is there any place in the user docs that this kind of gotcha should be documented? Or is it too niche?

I found same explanation is placed in structured streaming guide doc - I'll modify it as well. I'd emphasize this (release note, etc.) as this is changing the guarantee and end users may have to change their implementation of ForeachWriter.

@HeartSaVioR
Copy link
Contributor Author

I guess it's not only applied to Spark 3.0 but applied to all the versions. (may worth to port back) End users may need to be noticed even they don't upgrade their Spark version, as they need to revisit their implementation of ForeachWriter.

@SparkQA
Copy link

SparkQA commented Aug 12, 2019

Test build #108941 has finished for PR 25407 at commit 041a3a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. `epochId` can still be used
for deduplication, but there's less benefit to leverage this, as the chance for Spark to successfully write all
partitions and fail to checkpoint the batch is small. You also need to care about whether epoch is fully written,
via ensuring all partitions for the epochId are written successfully.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensuring all partitions for the epochId are written successfully.

Hm, looks like ForeachWriter doesn't know the number of partitions, so it cannot implement something like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you're right. There's no context around this unless end users do some kind of hacks (aggregating over couple of batches) and it's not worth to do with such hard hacks. I'll just get rid of guarantee for deduplication on epochId.

@SparkQA
Copy link

SparkQA commented Aug 12, 2019

Test build #108981 has finished for PR 25407 at commit b6681bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I guess I addressed all review comments. Could we please take another round of reviews?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine but let me leave it to @zsxwing

@SparkQA
Copy link

SparkQA commented Aug 13, 2019

Test build #109030 has finished for PR 25407 at commit 050e178.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@zsxwing kindly reminder.

@srowen
Copy link
Member

srowen commented Aug 19, 2019

@zsxwing if there are no more comments I think we can merge.

Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit
data and achieve exactly-once guarantees. However, if the streaming query is being executed
in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
- **Note:** Spark does not guarantee same output for (partitionId, epochId) on failure, so deduplication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove on failure. If a user stops a query, we may re-run a batch. I would not call this case as a failure. In addition, I think we can suggest users to use foreachBatch here if they needs deduplication.

Screen Shot 2019-08-19 at 3 40 04 PM

We should also update the above table from exactly-once to at-least-once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! Updated.

Btw, while I'm modifying the fault-tolerant of Foreach Sink from Depends on the implementation to Yes (at-least-once) as well, your screenshot seems to point out File Sink. Doesn't it guarantee exactly-once for corresponding Spark query via File Sink's specific metadata? I guess FileStreamSinkLog guarantees unique write per batch. If that's not the case and you've found another broken fault-tolerance for File Sink, I feel it would be nice to have another JIRA (at least another PR) to track them separately, with description of new finding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I took a wrong screenshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK. Never mind.

* and achieve exactly-once guarantees. However, if the streaming query is being executed in the
* continuous mode, then this guarantee does not hold and therefore should not be used for
* deduplication.
* <li>Spark doesn't guarantee same output for (partitionId, epochId) on failure, so deduplication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109371 has finished for PR 25407 at commit 48e908f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Aug 20, 2019

LGTM. Merging to master and 2.4.

@asfgit asfgit closed this in b37c8d5 Aug 20, 2019
asfgit pushed a commit that referenced this pull request Aug 20, 2019
…iter

#  What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details.

Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be  harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch is small.

Credit to zsxwing on discovering the broken guarantee.

## How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

Closes #25407 from HeartSaVioR/SPARK-28650.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit b37c8d5)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
@HeartSaVioR
Copy link
Contributor Author

Thanks for the quick review and merge!

@HeartSaVioR HeartSaVioR deleted the SPARK-28650 branch August 20, 2019 08:50
@dongjoon-hyun
Copy link
Member

Thank you, @HeartSaVioR , @zsxwing , @srowen , @HyukjinKwon . It's nice to have this in 2.4.4 documentation.

rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…iter

#  What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details.

Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be  harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch is small.

Credit to zsxwing on discovering the broken guarantee.

## How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

Closes apache#25407 from HeartSaVioR/SPARK-28650.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit b37c8d5)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…iter

#  What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details.

Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be  harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch is small.

Credit to zsxwing on discovering the broken guarantee.

## How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

Closes apache#25407 from HeartSaVioR/SPARK-28650.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit b37c8d5)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants