From 404113fbba68280ab532b4c246a16ce948da33fb Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 21 Jun 2016 13:14:20 -0700 Subject: [PATCH 1/3] Fix 'explain' for streaming Dataset --- .../spark/sql/execution/SparkStrategies.scala | 12 ++++++- .../sql/execution/command/commands.scala | 9 +++++- .../streaming/IncrementalExecution.scala | 1 + .../execution/streaming/StreamExecution.scala | 20 ++++++++++++ .../streaming/StreamingRelation.scala | 14 +++++++++ .../spark/sql/streaming/StreamingQuery.scala | 15 +++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 31 +++++++++++++++++++ .../spark/sql/streaming/StreamSuite.scala | 24 ++++++++++++++ 8 files changed, 124 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8e2f2ed4f86b9..dd6a1eba8db20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} -import org.apache.spark.sql.execution.streaming.MemoryPlan +import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -307,6 +307,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object StreamingRelationStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case s: StreamingRelation => + StreamingRelationExec(s.sourceName, s.output) :: Nil + case s: StreamingExecutionRelation => + StreamingRelationExec(s.toString, s.output) :: Nil + case _ => Nil + } + } + // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { def numPartitions: Int = self.numPartitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 38bb6e412f753..74887b052250c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.debug._ +import org.apache.spark.sql.execution.streaming.IncrementalExecution +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types._ /** @@ -98,7 +100,12 @@ case class ExplainCommand( // Run through the optimizer to generate the physical plan. override def run(sparkSession: SparkSession): Seq[Row] = try { - val queryExecution = sparkSession.sessionState.executePlan(logicalPlan) + val queryExecution = + if (logicalPlan.isStreaming) { + new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "dummy", 0) + } else { + sparkSession.sessionState.executePlan(logicalPlan) + } val outputString = if (codegen) { codegenString(queryExecution.executedPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index bc0e443ca7a55..0ce00552bf6cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -37,6 +37,7 @@ class IncrementalExecution private[sql]( // TODO: make this always part of planning. val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +: + sparkSession.sessionState.planner.StreamingRelationStrategy +: sparkSession.sessionState.experimentalMethods.extraStrategies // Modified planner with stateful operations. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1428b971490d7..f1af79e738faf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -473,6 +474,25 @@ class StreamExecution( } } + /** Expose for tests */ + def explainInternal(extended: Boolean): String = { + if (lastExecution == null) { + "N/A" + } else { + val explain = ExplainCommand(lastExecution.logical, extended = extended) + sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect() + .map(_.getString(0)).mkString("\n") + } + } + + override def explain(extended: Boolean): Unit = { + // scalastyle:off println + println(explainInternal(extended)) + // scalastyle:on println + } + + override def explain(): Unit = explain(extended = false) + override def toString: String = { s"Streaming Query - $name [state = $state]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 4d65d2f4f57fc..e8b00094add3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource object StreamingRelation { @@ -50,6 +53,17 @@ case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) ex override def toString: String = source.toString } +/** + * A dummy physical plan for [[StreamingRelation]] to support + * [[org.apache.spark.sql.Dataset.explain]] + */ +case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode { + override def toString: String = sourceName + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException("StreamingRelationExec cannot be executed") + } +} + object StreamingExecutionRelation { def apply(source: Source): StreamingExecutionRelation = { StreamingExecutionRelation(source, source.schema.toAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index dc81a5b180276..19d1ecf740d0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -107,6 +107,7 @@ trait StreamingQuery { * method may block forever. Additionally, this method is only guaranteed to block until data that * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). + * @since 2.0.0 */ def processAllAvailable(): Unit @@ -116,4 +117,18 @@ trait StreamingQuery { * @since 2.0.0 */ def stop(): Unit + + /** + * Prints the physical plan to the console for debugging purposes. + * @since 2.0.0 + */ + def explain(): Unit + + /** + * Prints the physical plan to the console for debugging purposes. + * + * @param extended whether to do extended explain or not + * @since 2.0.0 + */ + def explain(extended: Boolean): Unit } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6971f93b230f1..c182af8c193a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -592,6 +592,37 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ) } } + + test("explain") { + withTempDirs { case (src, tmp) => + src.mkdirs() + + val df = spark.readStream.format("text").load(src.getCanonicalPath).map(_ + "-x") + // Test `explain` not throwing errors + df.explain() + + val q = df.writeStream.queryName("file_explain").format("memory").start() + .asInstanceOf[StreamExecution] + try { + assert("N/A" === q.explainInternal(false)) + assert("N/A" === q.explainInternal(true)) + + val tempFile = Utils.tempFileWith(new File(tmp, "text")) + val finalFile = new File(src, tempFile.getName) + require(stringToFile(tempFile, "foo").renameTo(finalFile)) + + q.processAllAvailable() + + val explainWithoutExtended = q.explainInternal(false) + assert(explainWithoutExtended.contains("TextFileFormat")) + + val explainWithExtended = q.explainInternal(true) + assert(explainWithExtended.contains("TextFileFormat")) + } finally { + q.stop() + } + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b8e40e71bfce5..4ab6c67e7ec64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -242,6 +242,30 @@ class StreamSuite extends StreamTest { val o2 = OutputMode.Complete assert(o2 === InternalOutputModes.Complete) } + + test("explain") { + val inputData = MemoryStream[String] + val df = inputData.toDS().map(_ + "foo") + // Test `explain` not throwing errors + df.explain() + val q = df.writeStream.queryName("memory_explain").format("memory").start() + .asInstanceOf[StreamExecution] + try { + assert("N/A" === q.explainInternal(false)) + assert("N/A" === q.explainInternal(true)) + + inputData.addData("abc") + q.processAllAvailable() + + val explainWithoutExtended = q.explainInternal(false) + assert(explainWithoutExtended.contains("LocalTableScan")) + + val explainWithExtended = q.explainInternal(true) + assert(explainWithExtended.contains("LocalTableScan")) + } finally { + q.stop() + } + } } /** From 70b13b28b87609f8651ef4e5168cbca1e6c0bfa2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 23 Jun 2016 11:01:53 -0700 Subject: [PATCH 2/3] dummy -> --- .../scala/org/apache/spark/sql/execution/command/commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 74887b052250c..9b3a251188cae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -102,7 +102,7 @@ case class ExplainCommand( override def run(sparkSession: SparkSession): Seq[Row] = try { val queryExecution = if (logicalPlan.isStreaming) { - new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "dummy", 0) + new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "", 0) } else { sparkSession.sessionState.executePlan(logicalPlan) } From 40b43465fa3f4a61f9be7bce8d3e4ff2f3d0a707 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 23 Jun 2016 11:51:17 -0700 Subject: [PATCH 3/3] Address --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++++++ .../apache/spark/sql/execution/command/commands.scala | 2 ++ .../spark/sql/streaming/FileStreamSourceSuite.scala | 9 +++++++-- .../org/apache/spark/sql/streaming/StreamSuite.scala | 9 +++++++-- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dd6a1eba8db20..b619d4edc30de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -307,6 +307,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`. + * It won't affect the execution, because `StreamingRelation` will be replaced with + * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will + * be replaced with the real relation using the `Source` in `StreamExecution`. + */ object StreamingRelationStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case s: StreamingRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 9b3a251188cae..7eaad81a81615 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -102,6 +102,8 @@ case class ExplainCommand( override def run(sparkSession: SparkSession): Seq[Row] = try { val queryExecution = if (logicalPlan.isStreaming) { + // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the + // output mode does not matter since there is no `Sink`. new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "", 0) } else { sparkSession.sessionState.executePlan(logicalPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index c182af8c193a5..0eade71d1ebc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -614,10 +614,15 @@ class FileStreamSourceSuite extends FileStreamSourceTest { q.processAllAvailable() val explainWithoutExtended = q.explainInternal(false) - assert(explainWithoutExtended.contains("TextFileFormat")) + // `extended = false` only displays the physical plan. + assert("Relation.*text".r.findAllMatchIn(explainWithoutExtended).size === 0) + assert("TextFileFormat".r.findAllMatchIn(explainWithoutExtended).size === 1) val explainWithExtended = q.explainInternal(true) - assert(explainWithExtended.contains("TextFileFormat")) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("Relation.*text".r.findAllMatchIn(explainWithExtended).size === 3) + assert("TextFileFormat".r.findAllMatchIn(explainWithExtended).size === 1) } finally { q.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 4ab6c67e7ec64..c4a894b6816ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -258,10 +258,15 @@ class StreamSuite extends StreamTest { q.processAllAvailable() val explainWithoutExtended = q.explainInternal(false) - assert(explainWithoutExtended.contains("LocalTableScan")) + // `extended = false` only displays the physical plan. + assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0) + assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1) val explainWithExtended = q.explainInternal(true) - assert(explainWithExtended.contains("LocalTableScan")) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3) + assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1) } finally { q.stop() }