From 80d02681ae5290fefe991a7faef7273d79f5f1dd Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 12 Sep 2017 14:44:51 -0700 Subject: [PATCH 1/2] Add default stats to StreamingExecutionRelation. --- .../execution/streaming/StreamExecution.scala | 2 +- .../streaming/StreamingRelation.scala | 20 ++++++++++++++++--- .../sql/execution/streaming/memory.scala | 2 +- .../spark/sql/streaming/StreamSuite.scala | 20 ++++++++++++++++++- .../sql/streaming/StreamingQuerySuite.scala | 2 +- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 432b2d4925ae2..f82a2b05ee0fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -164,7 +164,7 @@ class StreamExecution( nextSourceId += 1 // We still need to use the previous `output` instead of `source.schema` as attributes in // "df.logicalPlan" has already used attributes of the previous `output`. - StreamingExecutionRelation(source, output) + StreamingExecutionRelation(source, output)(sparkSession) }) } sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index e8b00094add3a..ab716052c28ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource @@ -48,9 +50,21 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: * Used to link a streaming [[Source]] of data into a * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. */ -case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode { +case class StreamingExecutionRelation( + source: Source, + output: Seq[Attribute])(session: SparkSession) + extends LeafNode { + override def isStreaming: Boolean = true override def toString: String = source.toString + + // There's no sensible value here. On the execution path, this relation will be + // swapped out with microbatches. But some dataframe operations (in particular explain) do lead + // to this node surviving analysis. So we satisfy the LeafNode contract with the session default + // value. + override def computeStats(): Statistics = Statistics( + sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) + ) } /** @@ -65,7 +79,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext } object StreamingExecutionRelation { - def apply(source: Source): StreamingExecutionRelation = { - StreamingExecutionRelation(source, source.schema.toAttributes) + def apply(source: Source, session: SparkSession): StreamingExecutionRelation = { + StreamingExecutionRelation(source, source.schema.toAttributes)(session) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index c9784c093b408..3041d4d703cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -53,7 +53,7 @@ object MemoryStream { case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) extends Source with Logging { protected val encoder = encoderFor[A] - protected val logicalPlan = StreamingExecutionRelation(this) + protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession) protected val output = logicalPlan.output /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 012cccfdd9166..0c4cfab7fcd74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -74,6 +74,22 @@ class StreamSuite extends StreamTest { CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four"))) } + + test("explain join") { + // Make a table and ensure it will be broadcast. + val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word") + + // Join the input stream with a table. + val inputData = MemoryStream[Int] + val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value") + + val outputStream = new java.io.ByteArrayOutputStream() + Console.withOut(outputStream) { + joined.explain() + } + print(outputStream.toString) + } + test("SPARK-20432: union one stream with itself") { val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") val unioned = df.union(df) @@ -335,7 +351,9 @@ class StreamSuite extends StreamTest { override def stop(): Unit = {} } - val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source)) + val df = Dataset[Int]( + sqlContext.sparkSession, + StreamingExecutionRelation(source, sqlContext.sparkSession)) testStream(df)( // `ExpectFailure(isFatalError = true)` verifies two things: // - Fatal errors can be propagated to `StreamingQuery.exception` and diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 969f594edf615..c3ccfbc0b7197 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -653,7 +653,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } override def stop(): Unit = {} } - StreamingExecutionRelation(source) + StreamingExecutionRelation(source, spark) } /** Returns the query progress at the end of the first trigger of streaming DF */ From 81b1af9fbc9f1cbbda83252508d6d655c9a17307 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 13 Sep 2017 14:57:20 -0700 Subject: [PATCH 2/2] fix unit test to check output instead of print --- .../test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 0c4cfab7fcd74..673c2a34a94ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -87,7 +87,7 @@ class StreamSuite extends StreamTest { Console.withOut(outputStream) { joined.explain() } - print(outputStream.toString) + assert(outputStream.toString.contains("StreamingRelation")) } test("SPARK-20432: union one stream with itself") {