Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19168][Structured Streaming] StateStore should be aborted upon error #16547

Closed
wants to merge 2 commits into from

Conversation

lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Jan 11, 2017

What changes were proposed in this pull request?

We should call StateStore.abort() when there should be any error before the store is committed.

How was this patch tested?

Manually.

@SparkQA
Copy link

SparkQA commented Jan 11, 2017

Test build #71193 has finished for PR 16547 at commit c9f62c1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)
s"$outputMode output mode requires watermark to be speficied when there are " +
s"streaming aggregations on streaming DataFrames/DataSets")(plan)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an easy fix, so I've also included it in this patch

@SparkQA
Copy link

SparkQA commented Jan 11, 2017

Test build #71195 has finished for PR 16547 at commit f24aeb1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckLastBatch(),
assertNumStateRows(2)
assertNumStateRows(2, 1) // We also processed the data 10, which is less than watermark
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note this is assertNumStateRows(2, 1) without this patch, and is assertNumStateRows(2, 0) with this patch

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckLastBatch(),
assertNumStateRows(2)
assertNumStateRows(2, 0) // We processed nothing in this batch
Copy link
Contributor Author

@lw-lin lw-lin Jan 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note this is assertNumStateRows(2, 1) without this patch, and is assertNumStateRows(2, 0) with this patch.

@SparkQA
Copy link

SparkQA commented Jan 11, 2017

Test build #71203 has finished for PR 16547 at commit caf7b34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Jan 11, 2017

@tdas @zsxwing would you take a look, thanks!

@zsxwing
Copy link
Member

zsxwing commented Jan 11, 2017

@lw-lin I noticed a potential issue in update mode. It may not consume the whole Iterator when the downstream operator is take, which will make the state store not commit. Could you also fix it? The code should be similar to append mode. NVM. Since Structured Streaming doesn't support limit, it won't happen.

There is another issue in this class: it doesn't call StateStore.abort when an error happens. Could you also fix it?

@lw-lin
Copy link
Contributor Author

lw-lin commented Jan 12, 2017

@zsxwing thanks for the comments. I'll update this.

@SparkQA
Copy link

SparkQA commented Jan 12, 2017

Test build #71248 has started for PR 16547 at commit 07a020e.

@lw-lin
Copy link
Contributor Author

lw-lin commented Jan 12, 2017

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jan 12, 2017

Test build #71266 has finished for PR 16547 at commit 07a020e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Jan 13, 2017

@zsxwing updated as per your comments; would you take another look?

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2017

Discussed with @tdas offline. Regarding the changes to the append mode, it's unknown that if adding the filter is better because it will apply the filter on all rows but there are usually only few rows out of watermark. It's better to just leave it as it it.

For aborting StateStore, could you use TaskContext.addTaskCompletionListener instead? So that if the error doesn't happen in hasNext or next method, we can still abort it. You can add a flag to indicate if the StateStore is committed or not.

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2017

Could you also update the JIRA and the PR description after your changes?

@lw-lin
Copy link
Contributor Author

lw-lin commented Jan 13, 2017

@zsxwing thanks for the feedback! Ah, sure, let me update accordingly.

@lw-lin lw-lin changed the title [SPARK-19168][Structured Streaming] Improvement: filter late data using watermark for Append mode [SPARK-19168][Structured Streaming] StateStore should be aborted upon error Jan 17, 2017
@@ -335,4 +344,67 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
}

Copy link
Contributor Author

@lw-lin lw-lin Jan 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of the following test is to inject a mock state store, which:

  • throws an error in its commit() method
  • marks the aborted flag in its abort() method

Then at the end of the test, we would check if aborted is true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to not add this complicated test because:

  • TaskContext.get().addTaskCompletionListener has already been tested in other tests.
  • Scala Reflection is not thread-safe in 2.10 and this test may be flaky since SQL also uses Scala Reflection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! I hesitated a little when I added it -- yea it was complicated -- so let me remove it

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71512 has finished for PR 16547 at commit cf24fad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to not add the test. Otherwise, LGTM.

@@ -335,4 +344,67 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to not add this complicated test because:

  • TaskContext.get().addTaskCompletionListener has already been tested in other tests.
  • Scala Reflection is not thread-safe in 2.10 and this test may be flaky since SQL also uses Scala Reflection.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71565 has finished for PR 16547 at commit 0f9e54d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jan 18, 2017

LGTM. Merging to master and 2.1. Thanks!

@asfgit asfgit closed this in 569e506 Jan 18, 2017
asfgit pushed a commit that referenced this pull request Jan 18, 2017
… error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #16547 from lw-lin/append-filter.

(cherry picked from commit 569e506)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#16547 from lw-lin/append-filter.
@lw-lin lw-lin deleted the append-filter branch February 8, 2017 02:59
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
… error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#16547 from lw-lin/append-filter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants