Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19841][SS]watermarkPredicate should filter based on keys #17183

Closed
wants to merge 1 commit into from
Closed

[SPARK-19841][SS]watermarkPredicate should filter based on keys #17183

wants to merge 1 commit into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Mar 7, 2017

What changes were proposed in this pull request?

StreamingDeduplicateExec.watermarkPredicate should filter based on keys. Otherwise, it may generate a wrong answer if the watermark column in keyExpression has a different position in the row.

StateStoreSaveExec has the same codes but its parent can makes sure the watermark column positions in keyExpression and row are the same.

How was this patch tested?

The added test.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74054 has finished for PR 17183 at commit c74a2d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -361,7 +361,7 @@ case class StreamingDeduplicateExec(
val numUpdatedStateRows = longMetric("numUpdatedStateRows")

val baseIterator = watermarkPredicate match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case Some(predicate) => iter.filter(row => !predicate.eval(getKey(row)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very minor, but maybe we should just bind a different predicate to the child.output schema rather than have two projections here.

Perhaps the base class should have both watermarkExpression and keyPredicate to allow for this flexibility.

@marmbrus
Copy link
Contributor

marmbrus commented Mar 8, 2017

LGTM

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74151 has finished for PR 17183 at commit 89679f4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait WatermarkSupport extends UnaryExecNode

@zsxwing
Copy link
Member Author

zsxwing commented Mar 8, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74168 has finished for PR 17183 at commit 89679f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait WatermarkSupport extends UnaryExecNode

@zsxwing
Copy link
Member Author

zsxwing commented Mar 8, 2017

Thanks! Merging to master.

@asfgit asfgit closed this in ca849ac Mar 8, 2017
@zsxwing zsxwing deleted the SPARK-19841 branch March 8, 2017 04:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants