From 051c85afe727f39ba9d505e00e162620f69c808f Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 19 Mar 2018 11:48:11 -0700 Subject: [PATCH 1/4] disallow watermarks on both sides of stateful --- .../analysis/UnsupportedOperationChecker.scala | 13 +++++++++++++ .../analysis/UnsupportedOperationsSuite.scala | 15 +++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index b55043c270644..a15bf59abaded 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -160,6 +160,19 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") + case e: EventTimeWatermark => + val statefulChildren = e.collect { + case a: Aggregate if a.isStreaming => a + case d: Deduplicate if d.isStreaming => d + case f: FlatMapGroupsWithState if f.isStreaming => f + } + statefulChildren.foreach { statefulNode => + if (statefulNode.collectFirst{ case e: EventTimeWatermark => e }.isDefined) { + throwError("Watermarks both before and after a stateful operator in a streaming " + + "DataFrame/Dataset are not supported.") + } + } + // mapGroupsWithState and flatMapGroupsWithState case m: FlatMapGroupsWithState if m.isStreaming => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 60d1351fda264..d4bd76b92612a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -140,6 +140,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Complete, expectedMsgs = Seq("distinct aggregation")) + assertNotSupportedInStreamingPlan( + "aggregate on both sides of stateful op", + EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 1 second"), + Aggregate( + attributeWithWatermark :: Nil, + aggExprs("a"), + EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 2 seconds"), + streamRelation))), + outputMode = Append, + expectedMsgs = Seq("both before and after")) + val att = new AttributeReference(name = "a", dataType = LongType)() // FlatMapGroupsWithState: Both function modes equivalent and supported in batch. for (funcMode <- Seq(Append, Update)) { From cd8c638bb4a278651c2d65579cb9acf909efb97e Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 20 Mar 2018 09:12:22 -0700 Subject: [PATCH 2/4] allow duplicate watermarks if they're identical --- .../analysis/UnsupportedOperationChecker.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index a15bf59abaded..d341e0de6fa9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -160,14 +160,20 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") - case e: EventTimeWatermark => - val statefulChildren = e.collect { + case outer: EventTimeWatermark => + val statefulChildren = outer.collect { case a: Aggregate if a.isStreaming => a case d: Deduplicate if d.isStreaming => d case f: FlatMapGroupsWithState if f.isStreaming => f } statefulChildren.foreach { statefulNode => - if (statefulNode.collectFirst{ case e: EventTimeWatermark => e }.isDefined) { + val innerNonEquivalentWatermarks = statefulNode.collect { + case inner: EventTimeWatermark + if !inner.eventTime.semanticEquals(outer.eventTime) || + inner.delay != outer.delay => + inner + } + if (innerNonEquivalentWatermarks.nonEmpty) { throwError("Watermarks both before and after a stateful operator in a streaming " + "DataFrame/Dataset are not supported.") } From 1871edf7d5b174782709a0410f43467f39d31e68 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 20 Mar 2018 09:53:22 -0700 Subject: [PATCH 3/4] fix docs and add join case --- .../catalyst/analysis/UnsupportedOperationChecker.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index d341e0de6fa9a..63db1f21723fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -165,8 +165,13 @@ object UnsupportedOperationChecker { case a: Aggregate if a.isStreaming => a case d: Deduplicate if d.isStreaming => d case f: FlatMapGroupsWithState if f.isStreaming => f + case j: Join => j } statefulChildren.foreach { statefulNode => + // If two watermark ops are exactly equivalent (on the same Attribute and with the + // same interval), allow them both. This is to minimize disruption; this validation was + // added after the tests and one of our existing unit tests expects the special case + // to work. val innerNonEquivalentWatermarks = statefulNode.collect { case inner: EventTimeWatermark if !inner.eventTime.semanticEquals(outer.eventTime) || @@ -174,8 +179,8 @@ object UnsupportedOperationChecker { inner } if (innerNonEquivalentWatermarks.nonEmpty) { - throwError("Watermarks both before and after a stateful operator in a streaming " + - "DataFrame/Dataset are not supported.") + throwError("Watermarks may not be present both before and after a stateful " + + "operator in a streaming DataFrame/Dataset.") } } From b32b6a644f28c434f421222469429aeec6ee4002 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 20 Mar 2018 11:24:25 -0700 Subject: [PATCH 4/4] add and fix tests --- .../UnsupportedOperationChecker.scala | 4 +-- .../analysis/UnsupportedOperationsSuite.scala | 31 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 63db1f21723fc..69845324259f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -179,8 +179,8 @@ object UnsupportedOperationChecker { inner } if (innerNonEquivalentWatermarks.nonEmpty) { - throwError("Watermarks may not be present both before and after a stateful " + - "operator in a streaming DataFrame/Dataset.") + throwError("Watermarks both before and after a stateful operator in a streaming " + + "DataFrame/Dataset are not well-defined, and thus not supported.") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index d4bd76b92612a..bca3114dc57be 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -141,7 +141,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { expectedMsgs = Seq("distinct aggregation")) assertNotSupportedInStreamingPlan( - "aggregate on both sides of stateful op", + "watermark on both sides of aggregate", EventTimeWatermark( attribute, CalendarInterval.fromString("interval 1 second"), @@ -155,6 +155,35 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Append, expectedMsgs = Seq("both before and after")) + assertNotSupportedInStreamingPlan( + "watermark on both sides of deduplicate", + EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 1 second"), + Deduplicate( + attributeWithWatermark :: Nil, + EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 2 seconds"), + streamRelation))), + outputMode = Append, + expectedMsgs = Seq("both before and after")) + + assertNotSupportedInStreamingPlan( + "watermark on both sides of flatMapGroupsWithState", + EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 1 second"), + FlatMapGroupsWithState( + null, attribute, attribute, Seq(attribute), Seq(attribute), attribute, + null, Update, isMapGroupsWithState = false, null, + EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 2 seconds"), + streamRelation))), + outputMode = Update, + expectedMsgs = Seq("both before and after")) + val att = new AttributeReference(name = "a", dataType = LongType)() // FlatMapGroupsWithState: Both function modes equivalent and supported in batch. for (funcMode <- Seq(Append, Update)) {