From a287a9a6955b58609722947b3480bc5578d0b37d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 7 Jun 2016 18:51:38 -0700 Subject: [PATCH 1/2] Added support for sorting after streaming aggregation with complete mode --- .../UnsupportedOperationChecker.scala | 61 ++++++++++++------- .../analysis/UnsupportedOperationsSuite.scala | 16 ++++- .../spark/sql/streaming/StreamTest.scala | 24 +++++--- .../streaming/StreamingAggregationSuite.scala | 25 ++++++++ 4 files changed, 94 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8373fa336dd4c..689e016a5a1d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -43,6 +43,41 @@ object UnsupportedOperationChecker { "Queries without streaming sources cannot be executed with write.startStream()")(plan) } + // Disallow multiple streaming aggregations + val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + + if (aggregates.size > 1) { + throwError( + "Multiple streaming aggregations are not supported with " + + "streaming DataFrames/Datasets")(plan) + } + + // Disallow some output mode + outputMode match { + case InternalOutputModes.Append if aggregates.nonEmpty => + throwError( + s"$outputMode output mode not supported when there are streaming aggregations on " + + s"streaming DataFrames/DataSets")(plan) + + case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => + throwError( + s"$outputMode output mode not supported when there are no streaming aggregations on " + + s"streaming DataFrames/Datasets")(plan) + + case _ => + } + + /** + * Whether the subplan will contain complete data or incremental data in every incremental + * execution. Some operations may be allowed only when the child logical plan gives complete + * data. + */ + def containsCompleteData(subplan: LogicalPlan): Boolean = { + val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + // Either the subplan has no streaming source, or it has aggregation with Complete mode + !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) + } + plan.foreachUp { implicit subPlan => // Operations that cannot exists anywhere in a streaming plan @@ -107,8 +142,9 @@ object UnsupportedOperationChecker { case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Limits are not supported on streaming DataFrames/Datasets") - case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) => - throwError("Sorting is not supported on streaming DataFrames/Datasets") + case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => + throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + + "aggregated DataFrame/Dataset in Complete mode") case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") @@ -123,27 +159,6 @@ object UnsupportedOperationChecker { case _ => } } - - // Checks related to aggregations - val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a } - outputMode match { - case InternalOutputModes.Append if aggregates.nonEmpty => - throwError( - s"$outputMode output mode not supported when there are streaming aggregations on " + - s"streaming DataFrames/DataSets")(plan) - - case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => - throwError( - s"$outputMode output mode not supported when there are no streaming aggregations on " + - s"streaming DataFrames/Datasets")(plan) - - case _ => - } - if (aggregates.size > 1) { - throwError( - "Multiple streaming aggregations are not supported with " + - "streaming DataFrames/Datasets")(plan) - } } private def throwErrorIf( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 378cca3644eab..ad4cb078ca981 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -81,7 +81,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Append, expectedMsgs = "commands" :: Nil) - // Multiple streaming aggregations not supported + // Aggregation: Multiple streaming aggregations not supported def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name)) assertSupportedInStreamingPlan( @@ -189,9 +189,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.intersect(_), streamStreamSupported = false) - // Unary operations + // Sort: supported only on batch subplans and on aggregation + complete output mode testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort") + assertSupportedInStreamingPlan( + "sort - sort over aggregated data in Complete output mode", + Sort(Nil, true, Aggregate(Nil, aggExprs("c"), streamRelation)), + Complete) + assertNotSupportedInStreamingPlan( + "sort - sort over aggregated data in Update output mode", + Sort(Nil, true, Aggregate(Nil, aggExprs("c"), streamRelation)), + Update, + Seq("sort", "aggregat", "complete")) // sort on aggregations is supported on Complete mode only + + // Other unary operations testUnaryOperatorInStreamingPlan( "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling") testUnaryOperatorInStreamingPlan( @@ -299,6 +310,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode) } + /** Test output mode with and without aggregation in the streaming plan */ def testOutputMode( outputMode: OutputMode, shouldSupportAggregation: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 194c3e7307255..7f1e5fe6135a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -111,10 +111,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { def apply[A : Encoder](data: A*): CheckAnswerRows = { val encoder = encoderFor[A] val toExternalRow = RowEncoder(encoder.schema).resolveAndBind() - CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false) + CheckAnswerRows( + data.map(d => toExternalRow.fromRow(encoder.toRow(d))), + lastOnly = false, + isSorted = false) } - def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false) + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, false) } /** @@ -123,15 +126,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { */ object CheckLastBatch { def apply[A : Encoder](data: A*): CheckAnswerRows = { + apply(isSorted = false, data: _*) + } + + def apply[A: Encoder](isSorted: Boolean, data: A*): CheckAnswerRows = { val encoder = encoderFor[A] val toExternalRow = RowEncoder(encoder.schema).resolveAndBind() - CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true) + CheckAnswerRows( + data.map(d => toExternalRow.fromRow(encoder.toRow(d))), + lastOnly = true, + isSorted = isSorted) } - def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true) + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true, false) } - case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean) + case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean, isSorted: Boolean) extends StreamAction with StreamMustBeRunning { override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}" private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" @@ -414,7 +424,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { failTest("Error adding data", e) } - case CheckAnswerRows(expectedAnswer, lastOnly) => + case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) => verify(currentStream != null, "stream not running") // Get the map of source index to the current source objects val indexToSource = currentStream @@ -436,7 +446,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { failTest("Exception while getting data from sink", e) } - QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach { + QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach { error => failTest(error) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 1f174aee8ce08..7cf37e285f574 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -104,6 +104,31 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { } } + test("operation after aggregate in complete mode") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .toDF("value", "count") + .orderBy($"count".desc) + .as[(Int, Long)] + + testStream(aggregated, Complete)( + AddData(inputData, 3), + CheckLastBatch(isSorted = true, (3, 1)), + AddData(inputData, 2, 3), + CheckLastBatch(isSorted = true, (3, 2), (2, 1)), + StopStream, + StartStream(), + AddData(inputData, 3, 2, 1), + CheckLastBatch(isSorted = true, (3, 3), (2, 2), (1, 1)), + AddData(inputData, 4, 4, 4, 4), + CheckLastBatch(isSorted = true, (4, 4), (3, 3), (2, 2), (1, 1)) + ) + } + test("multiple keys") { val inputData = MemoryStream[Int] From 810b802060792da78b61c44ad9b656ce3e02b4a3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 9 Jun 2016 10:58:11 -0700 Subject: [PATCH 2/2] Addressed comments --- .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 7 ++++--- .../spark/sql/streaming/StreamingAggregationSuite.scala | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index ad4cb078ca981..c21ad5e03a48d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -191,18 +191,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite { // Sort: supported only on batch subplans and on aggregation + complete output mode testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) - testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort") assertSupportedInStreamingPlan( "sort - sort over aggregated data in Complete output mode", - Sort(Nil, true, Aggregate(Nil, aggExprs("c"), streamRelation)), + streamRelation.groupBy()(Count("*")).sortBy(), Complete) assertNotSupportedInStreamingPlan( "sort - sort over aggregated data in Update output mode", - Sort(Nil, true, Aggregate(Nil, aggExprs("c"), streamRelation)), + streamRelation.groupBy()(Count("*")).sortBy(), Update, Seq("sort", "aggregat", "complete")) // sort on aggregations is supported on Complete mode only + // Other unary operations + testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort") testUnaryOperatorInStreamingPlan( "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling") testUnaryOperatorInStreamingPlan( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 7cf37e285f574..8681199817fe6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -104,7 +104,7 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { } } - test("operation after aggregate in complete mode") { + test("sort after aggregate in complete mode") { val inputData = MemoryStream[Int] val aggregated =