From acb3a58db03d64a5c533f680f828d08b5357f5b1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Dec 2015 00:46:34 +0800 Subject: [PATCH 01/10] Support UnsafeRow in Limit. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index b3e4688557ba0..e1c02fea2e845 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -162,6 +162,10 @@ case class Limit(limit: Int, child: SparkPlan) override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition + override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) protected override def doExecute(): RDD[InternalRow] = { From 304c94a6faa16a19fa01b29d1b4efbf235c705d8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Dec 2015 11:13:13 +0800 Subject: [PATCH 02/10] Ignore the test because Limit node accepts UnsafeRow now. --- .../apache/spark/sql/execution/RowFormatConvertersSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 2328899bb2f8d..3e9070dc24d8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -38,7 +38,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name")) assert(outputsUnsafe.outputsUnsafeRows) - test("planner should insert unsafe->safe conversions when required") { + ignore("planner should insert unsafe->safe conversions when required") { val plan = Limit(10, outputsUnsafe) val preparedPlan = sqlContext.prepareForExecution.execute(plan) assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe]) From ecf7ec80205130b1c073d01580f04580272a584d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Dec 2015 11:25:16 +0800 Subject: [PATCH 03/10] Support UnsafeRow in TakeOrderedAndProject. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index e1c02fea2e845..0a415e604233e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -211,7 +211,7 @@ case class TakeOrderedAndProject( private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output) // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. - @transient private val projection = projectList.map(new InterpretedProjection(_, child.output)) + @transient private val projection = projectList.map(UnsafeProjection.create(_, child.output)) private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) From ba0679599e4cb7b896d3f6a8cad6aa564d7f9429 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Dec 2015 13:30:43 +0800 Subject: [PATCH 04/10] Add copy(). --- .../apache/spark/sql/execution/basicOperators.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 0a415e604233e..385e1d27f6c53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,6 +204,10 @@ case class TakeOrderedAndProject( projectOutput.getOrElse(child.output) } + override def outputsUnsafeRows: Boolean = true + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true + override def outputPartitioning: Partitioning = SinglePartition // We need to use an interpreted ordering here because generated orderings cannot be serialized @@ -215,7 +219,11 @@ case class TakeOrderedAndProject( private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) - projection.map(data.map(_)).getOrElse(data) + if (projection.isDefined) { + projection.map(data.map(_)).get + } else { + data + } } override def executeCollect(): Array[InternalRow] = { From c343447db3e9db2571d746d7ac2218e58ba2f992 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Dec 2015 16:28:55 +0800 Subject: [PATCH 05/10] add copy(). --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 385e1d27f6c53..f4af2aba99014 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -220,7 +220,7 @@ case class TakeOrderedAndProject( private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projection.isDefined) { - projection.map(data.map(_)).get + projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get } else { data } From fdc009724925fde0909d15195bbfcc1058950cd2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 24 Dec 2015 15:15:38 +0800 Subject: [PATCH 06/10] Add extra unsafe projection for the case projectList is None. --- .../org/apache/spark/sql/execution/basicOperators.scala | 8 +++++++- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index f4af2aba99014..52c9c47ad34f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -216,13 +216,19 @@ case class TakeOrderedAndProject( // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. @transient private val projection = projectList.map(UnsafeProjection.create(_, child.output)) + @transient private lazy val unsafeProjection = + UnsafeProjection.create(child.output.map(_.dataType).toArray) private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projection.isDefined) { projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get } else { - data + if (child.outputsUnsafeRows) { + data + } else { + data.map(unsafeProjection(_).copy().asInstanceOf[InternalRow]) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bb82b562aaaa2..8bfb000d4de49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -571,6 +571,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { mapData.collect().take(1).map(Row.fromTuple).toSeq) } + test("sort and limit") { + checkAnswer( + sql("SELECT * FROM arrayData ORDER BY data[0] ASC LIMIT 1"), + arrayData.collect().sortBy(_.data(0)).map(Row.fromTuple).take(1).toSeq) + } + test("CTE feature") { checkAnswer( sql("with q1 as (select * from testData limit 10) select * from q1"), From 04eb37edca49c0c5a344c17c898924e15e1680b9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 25 Dec 2015 18:24:28 +0800 Subject: [PATCH 07/10] Set proper outputsUnsafeRows when projectList is None. --- .../spark/sql/execution/basicOperators.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 52c9c47ad34f2..b8b6adf41f85c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,7 +204,12 @@ case class TakeOrderedAndProject( projectOutput.getOrElse(child.output) } - override def outputsUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = if (projectList.isDefined) { + true + } else { + child.outputsUnsafeRows + } + override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = true @@ -216,19 +221,13 @@ case class TakeOrderedAndProject( // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. @transient private val projection = projectList.map(UnsafeProjection.create(_, child.output)) - @transient private lazy val unsafeProjection = - UnsafeProjection.create(child.output.map(_.dataType).toArray) private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projection.isDefined) { projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get } else { - if (child.outputsUnsafeRows) { - data - } else { - data.map(unsafeProjection(_).copy().asInstanceOf[InternalRow]) - } + data } } From 6a519d85b58f25776be6e5819025b1bf485f7da8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 25 Dec 2015 23:17:01 +0800 Subject: [PATCH 08/10] Remove unnecessary copy() and test cases. --- .../org/apache/spark/sql/execution/basicOperators.scala | 2 +- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index b8b6adf41f85c..69752dee36ff0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -225,7 +225,7 @@ case class TakeOrderedAndProject( private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projection.isDefined) { - projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get + projection.map(p => data.map(p(_).asInstanceOf[InternalRow])).get } else { data } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8bfb000d4de49..bb82b562aaaa2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -571,12 +571,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { mapData.collect().take(1).map(Row.fromTuple).toSeq) } - test("sort and limit") { - checkAnswer( - sql("SELECT * FROM arrayData ORDER BY data[0] ASC LIMIT 1"), - arrayData.collect().sortBy(_.data(0)).map(Row.fromTuple).take(1).toSeq) - } - test("CTE feature") { checkAnswer( sql("with q1 as (select * from testData limit 10) select * from q1"), From f3054d63a444432a9cb71ff0ed16b828a5af71cc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 26 Dec 2015 08:53:19 +0800 Subject: [PATCH 09/10] Add copy() back. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 69752dee36ff0..b8b6adf41f85c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -225,7 +225,7 @@ case class TakeOrderedAndProject( private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projection.isDefined) { - projection.map(p => data.map(p(_).asInstanceOf[InternalRow])).get + projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get } else { data } From c44c93ada9e79578d43ba7ad09e30c342f05e921 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 29 Dec 2015 17:01:38 +0800 Subject: [PATCH 10/10] Add a Dummy Node to test. --- .../sql/execution/RowFormatConvertersSuite.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 3e9070dc24d8f..27192d6f2d6c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -26,6 +26,15 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{ArrayType, StringType} import org.apache.spark.unsafe.types.UTF8String +case class DummySafeNode(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def canProcessUnsafeRows: Boolean = false + override def canProcessSafeRows: Boolean = true + + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = child.execute() +} + class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { private def getConverters(plan: SparkPlan): Seq[SparkPlan] = plan.collect { @@ -38,8 +47,8 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name")) assert(outputsUnsafe.outputsUnsafeRows) - ignore("planner should insert unsafe->safe conversions when required") { - val plan = Limit(10, outputsUnsafe) + test("planner should insert unsafe->safe conversions when required") { + val plan = DummySafeNode(10, outputsUnsafe) val preparedPlan = sqlContext.prepareForExecution.execute(plan) assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe]) }