Skip to content

Conversation

@jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

Forbid watermarks on both sides of stateful streaming operators.

Multiple sequential watermarks are in general not supported by the execution engine; support is only in parallel, e.g. on both sides of a join. We can normally resolve this by simply picking the topmost watermark operator and ignoring the rest, but this is not semantically valid when there's a stateful operator in between.

How was this patch tested?

new unit test

@jose-torres
Copy link
Contributor Author

@tdas

val statefulChildren = e.collect {
case a: Aggregate if a.isStreaming => a
case d: Deduplicate if d.isStreaming => d
case f: FlatMapGroupsWithState if f.isStreaming => f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be for joins as well.

}
statefulChildren.foreach { statefulNode =>
if (statefulNode.collectFirst{ case e: EventTimeWatermark => e }.isDefined) {
throwError("Watermarks both before and after a stateful operator in a streaming " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the impression that it makes sense but we dont support it. In fact, its just ill-defined. May change this to something like ... Multiple watermarks before and after stateful operators is not well-defined in a streaming query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT of something like "watermarks may not be present...". Talking about "well-defined" seems a bit confusing to me.

CalendarInterval.fromString("interval 2 seconds"),
streamRelation))),
outputMode = Append,
expectedMsgs = Seq("both before and after"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add for other stateful operators as well.

@SparkQA
Copy link

SparkQA commented Mar 19, 2018

Test build #88383 has finished for PR 20859 at commit 051c85a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88424 has finished for PR 20859 at commit cd8c638.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88432 has finished for PR 20859 at commit b32b6a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jul 30, 2018

How would we like to handle this patch? I guess we add feature on handling multiple watermarks in #21701 so based on the direction this patch might be going to be abandoned. IMHO I'm not 100% sure we have clear use cases for defining multiple watermarks for one source, so this patch might help on simplify handling watermark.

@SparkQA
Copy link

SparkQA commented Oct 17, 2018

Test build #97493 has finished for PR 20859 at commit b32b6a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants