From b83349567760dd0d33388d3fc68d8db1b648e1f1 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 25 Aug 2017 13:48:49 -0700 Subject: [PATCH 01/11] Check that optimization doesn't affect isStreaming bit. --- .../sql/catalyst/optimizer/Optimizer.scala | 15 +++++++++-- .../optimizer/PropagateEmptyRelation.scala | 3 ++- .../plans/logical/LocalRelation.scala | 2 +- .../sql/catalyst/rules/RuleExecutor.scala | 7 ++++++ .../PropagateEmptyRelationSuite.scala | 25 +++++++++++++++++++ .../execution/streaming/StreamExecution.scala | 2 +- 6 files changed, 49 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75d83bc6e86f8..32088f25c2dfd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -39,6 +39,15 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + override protected def checkInvariants( + result: LogicalPlan, + original: LogicalPlan, + rule: Rule[LogicalPlan]): Unit = { + assert( + result.isStreaming == original.isStreaming, + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}") + } + def batches: Seq[Batch] = { Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more @@ -738,8 +747,10 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { case Filter(Literal(true, BooleanType), child) => child // If the filter condition always evaluate to null or false, // replace the input with an empty relation. - case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) - case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) + case Filter(Literal(null, _), child) => + LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) + case Filter(Literal(false, BooleanType), child) => + LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition case f @ Filter(fc, p: LogicalPlan) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 987cd7434b459..f593a34e3cd23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -38,7 +38,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { case _ => false } - private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty) + private def empty(plan: LogicalPlan) = + LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p: Union if p.children.forall(isEmptyLocalRelation) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 7a21183664c56..d73d7e73f28d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -58,7 +58,7 @@ case class LocalRelation(output: Seq[Attribute], * query. */ override final def newInstance(): this.type = { - LocalRelation(output.map(_.newInstance()), data).asInstanceOf[this.type] + LocalRelation(output.map(_.newInstance()), data, isStreaming).asInstanceOf[this.type] } override protected def stringArgs: Iterator[Any] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 85b368c862630..fa86b4a17542e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -63,6 +63,11 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected def batches: Seq[Batch] + /** Checks invariants that should hold across rule execution. */ + protected def checkInvariants( + result: TreeType, + original: TreeType, + rule: Rule[TreeType]): Unit = {} /** * Executes the batches of rules defined by the subclass. The batches are executed serially @@ -86,6 +91,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val runTime = System.nanoTime() - startTime RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime) + checkInvariants(result, plan, rule) + if (!result.fastEquals(plan)) { logTrace( s""" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 2285be16938d6..b69ec31954af2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.StructType class PropagateEmptyRelationSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { @@ -124,6 +126,29 @@ class PropagateEmptyRelationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("propagate empty streaming relation through multiple UnaryNode") { + val output = Seq('a.int) + val data = Seq(Row(1)) + val schema = StructType.fromAttributes(output) + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val relation = LocalRelation( + output, + data.map(converter(_).asInstanceOf[InternalRow]), + isStreaming = true) + + val query = relation + .where(false) + .select('a) + .groupBy('a)('a) + .where('a > 1) + .orderBy('a.asc) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation(output, isStreaming = true) + + comparePlans(optimized, correctAnswer) + } + test("don't propagate non-empty local relation") { val query = testRelation1 .where(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 432b2d4925ae2..cb195f66649b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -631,7 +631,7 @@ class StreamExecution( replacements ++= output.zip(newPlan.output) newPlan }.getOrElse { - LocalRelation(output) + LocalRelation(output, isStreaming = true) } } From 4036767f68770324901ee3edbe01f30fe3bba1b4 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 25 Aug 2017 14:43:22 -0700 Subject: [PATCH 02/11] Add plans to isStreaming invariant check debug message. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 32088f25c2dfd..52a0fc936d7f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -45,7 +45,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) rule: Rule[LogicalPlan]): Unit = { assert( result.isStreaming == original.isStreaming, - s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}") + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}:" + + s"original:\n$original\nnew:\n$result") } def batches: Seq[Batch] = { From d815e560aad78e45203babf2a6c014bed71fee64 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 25 Aug 2017 14:49:08 -0700 Subject: [PATCH 03/11] Move checkInvariants inside condition that the plan has changed. --- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index fa86b4a17542e..90949f7bb5202 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -63,7 +63,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected def batches: Seq[Batch] - /** Checks invariants that should hold across rule execution. */ + /** Checks invariants that should hold across rule executions. */ protected def checkInvariants( result: TreeType, original: TreeType, @@ -91,9 +91,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val runTime = System.nanoTime() - startTime RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime) - checkInvariants(result, plan, rule) - if (!result.fastEquals(plan)) { + checkInvariants(result, plan, rule) logTrace( s""" |=== Applying Rule ${rule.ruleName} === From a25534eb2ef7c303ff77dce92aad543ca6c171d7 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 25 Aug 2017 14:50:29 -0700 Subject: [PATCH 04/11] Don't propagate empty relation through aggregate for streaming. --- .../sql/catalyst/optimizer/PropagateEmptyRelation.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index f593a34e3cd23..1246dc05836cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -66,11 +66,12 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { case _: RepartitionByExpression => empty(p) // An aggregate with non-empty group expression will return one output row per group when the // input to the aggregate is not empty. If the input to the aggregate is empty then all groups - // will be empty and thus the output will be empty. + // will be empty and thus the output will be empty. If we're working on batch data, we can + // then treat the aggregate as redundant. // // If the grouping expressions are empty, however, then the aggregate will always produce a // single output row and thus we cannot propagate the EmptyRelation. - case Aggregate(ge, _, _) if ge.nonEmpty => empty(p) + case Aggregate(ge, _, _) if ge.nonEmpty and !p.isStreaming => empty(p) // Generators like Hive-style UDTF may return their records within `close`. case Generate(_: Explode, _, _, _, _, _) => empty(p) case _ => p From 5c61a13f53f09673705fcc1baa6c084e593c8b00 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 25 Aug 2017 15:11:26 -0700 Subject: [PATCH 05/11] Add test for not propagating empty relation through streaming aggregate --- .../optimizer/PropagateEmptyRelation.scala | 5 ++++- .../PropagateEmptyRelationSuite.scala | 21 ++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 1246dc05836cb..cfffa6bc2bfdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -69,9 +69,12 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { // will be empty and thus the output will be empty. If we're working on batch data, we can // then treat the aggregate as redundant. // + // If the aggregate is over streaming data, we may need to update the state store even if no + // new rows are processed, so we can't eliminate the node. + // // If the grouping expressions are empty, however, then the aggregate will always produce a // single output row and thus we cannot propagate the EmptyRelation. - case Aggregate(ge, _, _) if ge.nonEmpty and !p.isStreaming => empty(p) + case Aggregate(ge, _, _) if ge.nonEmpty && !p.isStreaming => empty(p) // Generators like Hive-style UDTF may return their records within `close`. case Generate(_: Explode, _, _, _, _, _) => empty(p) case _ => p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index b69ec31954af2..bc1c48b99c295 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -139,8 +139,8 @@ class PropagateEmptyRelationSuite extends PlanTest { val query = relation .where(false) .select('a) - .groupBy('a)('a) .where('a > 1) + .where('a != 200) .orderBy('a.asc) val optimized = Optimize.execute(query.analyze) @@ -149,6 +149,25 @@ class PropagateEmptyRelationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("don't propagate empty streaming relation through agg") { + val output = Seq('a.int) + val data = Seq(Row(1)) + val schema = StructType.fromAttributes(output) + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val relation = LocalRelation( + output, + data.map(converter(_).asInstanceOf[InternalRow]), + isStreaming = true) + + val query = relation + .groupBy('a)('a) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + test("don't propagate non-empty local relation") { val query = testRelation1 .where(true) From feda29ff7d7320882c724b7f597beba816e583ce Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 28 Aug 2017 11:27:44 -0700 Subject: [PATCH 06/11] Add isStreaming to TextSocketStream. --- .../sql/execution/streaming/socket.scala | 9 ++- .../streaming/TextSocketStreamSuite.scala | 72 ++++++++++--------- 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 8e63207959575..f141a8f5b2afc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -29,8 +29,10 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String object TextSocketSource { @@ -126,8 +128,9 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } - import sqlContext.implicits._ - val rawBatch = sqlContext.createDataset(rawList) + val rdd = sqlContext.sparkContext.parallelize(rawList).map( + v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) + val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. @@ -135,7 +138,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo rawBatch.toDF("value", "timestamp") } else { // Strip out timestamp - rawBatch.select("_1").toDF("value") + rawBatch.select("value").toDF() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala index 9ebf4d2835266..ec11549073650 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala @@ -65,20 +65,22 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before while (source.getOffset.isEmpty) { Thread.sleep(10) } - val offset1 = source.getOffset.get - val batch1 = source.getBatch(None, offset1) - assert(batch1.as[String].collect().toSeq === Seq("hello")) + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val offset1 = source.getOffset.get + val batch1 = source.getBatch(None, offset1) + assert(batch1.as[String].collect().toSeq === Seq("hello")) + + serverThread.enqueue("world") + while (source.getOffset.get === offset1) { + Thread.sleep(10) + } + val offset2 = source.getOffset.get + val batch2 = source.getBatch(Some(offset1), offset2) + assert(batch2.as[String].collect().toSeq === Seq("world")) - serverThread.enqueue("world") - while (source.getOffset.get === offset1) { - Thread.sleep(10) + val both = source.getBatch(None, offset2) + assert(both.as[String].collect().sorted.toSeq === Seq("hello", "world")) } - val offset2 = source.getOffset.get - val batch2 = source.getBatch(Some(offset1), offset2) - assert(batch2.as[String].collect().toSeq === Seq("world")) - - val both = source.getBatch(None, offset2) - assert(both.as[String].collect().sorted.toSeq === Seq("hello", "world")) // Try stopping the source to make sure this does not block forever. source.stop() @@ -104,22 +106,24 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before while (source.getOffset.isEmpty) { Thread.sleep(10) } - val offset1 = source.getOffset.get - val batch1 = source.getBatch(None, offset1) - val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq - assert(batch1Seq.map(_._1) === Seq("hello")) - val batch1Stamp = batch1Seq(0)._2 - - serverThread.enqueue("world") - while (source.getOffset.get === offset1) { - Thread.sleep(10) + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val offset1 = source.getOffset.get + val batch1 = source.getBatch(None, offset1) + val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq + assert(batch1Seq.map(_._1) === Seq("hello")) + val batch1Stamp = batch1Seq(0)._2 + + serverThread.enqueue("world") + while (source.getOffset.get === offset1) { + Thread.sleep(10) + } + val offset2 = source.getOffset.get + val batch2 = source.getBatch(Some(offset1), offset2) + val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq + assert(batch2Seq.map(_._1) === Seq("world")) + val batch2Stamp = batch2Seq(0)._2 + assert(!batch2Stamp.before(batch1Stamp)) } - val offset2 = source.getOffset.get - val batch2 = source.getBatch(Some(offset1), offset2) - val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq - assert(batch2Seq.map(_._1) === Seq("world")) - val batch2Stamp = batch2Seq(0)._2 - assert(!batch2Stamp.before(batch1Stamp)) // Try stopping the source to make sure this does not block forever. source.stop() @@ -184,12 +188,14 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before while (source.getOffset.isEmpty) { Thread.sleep(10) } - val batch = source.getBatch(None, source.getOffset.get).as[String] - batch.collect() - val numRowsMetric = - batch.queryExecution.executedPlan.collectLeaves().head.metrics.get("numOutputRows") - assert(numRowsMetric.nonEmpty) - assert(numRowsMetric.get.value === 1) + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val batch = source.getBatch(None, source.getOffset.get).as[String] + batch.collect() + val numRowsMetric = + batch.queryExecution.executedPlan.collectLeaves().head.metrics.get("numOutputRows") + assert(numRowsMetric.nonEmpty) + assert(numRowsMetric.get.value === 1) + } source.stop() source = null } From 28aad0cc21fb93c1d3b0026f5d55b3d953c7bb7d Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 29 Aug 2017 10:00:39 -0700 Subject: [PATCH 07/11] Remove redundant toDF() calls. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../org/apache/spark/sql/execution/streaming/socket.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 52a0fc936d7f9..b9c09fccfc610 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -45,7 +45,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) rule: Rule[LogicalPlan]): Unit = { assert( result.isStreaming == original.isStreaming, - s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}:" + + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}: " + s"original:\n$original\nnew:\n$result") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index f141a8f5b2afc..09581d776205e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -135,10 +135,10 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { - rawBatch.toDF("value", "timestamp") + rawBatch.select("value", "timestamp") } else { // Strip out timestamp - rawBatch.select("value").toDF() + rawBatch.select("value") } } From b81553ce2d4f5aa4b86bc0ba9485f925a3a7685c Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 29 Aug 2017 10:03:53 -0700 Subject: [PATCH 08/11] Don't explicitly select TextSocketStream cols, since they're already in the schema. --- .../apache/spark/sql/execution/streaming/socket.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 09581d776205e..62735e2b172b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo val rdd = sqlContext.sparkContext.parallelize(rawList).map( v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) - val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) - - // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp - // if requested. - if (includeTimestamp) { - rawBatch.select("value", "timestamp") - } else { - // Strip out timestamp - rawBatch.select("value") - } + sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } override def commit(end: Offset): Unit = synchronized { From 9f8b11f234c86d6f683285b2f2b7558c417f1cfc Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 29 Aug 2017 12:32:46 -0700 Subject: [PATCH 09/11] Remove Optimizer invariant check. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 ---------- .../apache/spark/sql/catalyst/rules/RuleExecutor.scala | 7 ------- 2 files changed, 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b9c09fccfc610..251ebc5995492 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -39,16 +39,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) - override protected def checkInvariants( - result: LogicalPlan, - original: LogicalPlan, - rule: Rule[LogicalPlan]): Unit = { - assert( - result.isStreaming == original.isStreaming, - s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}: " + - s"original:\n$original\nnew:\n$result") - } - def batches: Seq[Batch] = { Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 90949f7bb5202..0e89d1c8f31e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -63,12 +63,6 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected def batches: Seq[Batch] - /** Checks invariants that should hold across rule executions. */ - protected def checkInvariants( - result: TreeType, - original: TreeType, - rule: Rule[TreeType]): Unit = {} - /** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. @@ -92,7 +86,6 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime) if (!result.fastEquals(plan)) { - checkInvariants(result, plan, rule) logTrace( s""" |=== Applying Rule ${rule.ruleName} === From 04d5ede50a362f40c9b24322c31316da3b1efb29 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 5 Sep 2017 09:45:48 -0700 Subject: [PATCH 10/11] use case map in socket.scala --- .../org/apache/spark/sql/execution/streaming/socket.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 62735e2b172b4..49cd8ccc75cf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } - val rdd = sqlContext.sparkContext.parallelize(rawList).map( - v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) + val rdd = sqlContext.sparkContext. + parallelize(rawList). + map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } From a3ec0f2cf3ec92aa30327c856820722ae7f22e7c Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 5 Sep 2017 16:57:10 -0700 Subject: [PATCH 11/11] move .$ to ^. --- .../org/apache/spark/sql/execution/streaming/socket.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 49cd8ccc75cf7..0b22cbc46e6bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -128,9 +128,9 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } - val rdd = sqlContext.sparkContext. - parallelize(rawList). - map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } + val rdd = sqlContext.sparkContext + .parallelize(rawList) + .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) }