From 2b6b3f8179e0fdd9d66131642ff2f6aa6ba46c2e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 3 Jul 2015 00:46:57 +0800 Subject: [PATCH 1/2] add more transient --- .../org/apache/spark/sql/execution/basicOperators.scala | 3 +-- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 647c4ab5cb651..c92533667e1c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -166,8 +166,7 @@ case class TakeOrderedAndProject( private val ord: RowOrdering = new RowOrdering(sortOrder, child.output) - // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. - @transient private val projection = projectList.map(new InterpretedProjection(_, child.output)) + private val projection = projectList.map(new InterpretedProjection(_, child.output)) private def collectData(): Array[InternalRow] = { val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 05f425f2b65f3..d5b499bb64e50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,7 +21,6 @@ import java.util import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer @@ -43,8 +42,8 @@ import org.apache.spark.util.SerializableJobConf private[hive] case class InsertIntoHiveTable( table: MetastoreRelation, - partition: Map[String, Option[String]], - child: SparkPlan, + @transient partition: Map[String, Option[String]], + @transient child: SparkPlan, overwrite: Boolean, ifNotExists: Boolean) extends UnaryNode with HiveInspectors { From 39b103529a848ba48d970feea5859d96c3692c81 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 3 Jul 2015 14:01:09 +0800 Subject: [PATCH 2/2] make SparkPlan only executed at driver side --- .../spark/sql/execution/SparkPlan.scala | 2 +- .../spark/sql/execution/basicOperators.scala | 26 ++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 7739a9f949c77..3073e56842110 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -100,8 +100,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Runs this query returning the result as an array. */ def executeCollect(): Array[Row] = { + val converter = CatalystTypeConverters.createToScalaConverter(schema) execute().mapPartitions { iter => - val converter = CatalystTypeConverters.createToScalaConverter(schema) iter.map(converter(_).asInstanceOf[Row]) }.collect() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index c92533667e1c1..6b3c44c564fae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -36,11 +36,15 @@ import org.apache.spark.{HashPartitioner, SparkEnv} case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) - @transient lazy val buildProjection = newMutableProjection(projectList, child.output) + private def buildProjection = newMutableProjection(projectList, child.output) - protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => - val reusableProjection = buildProjection() - iter.map(reusableProjection) + protected override def doExecute(): RDD[InternalRow] = { + // Use local variable to avoid referencing to $out inside closure + val localBuildProjection = buildProjection + child.execute().mapPartitions { iter => + val reusableProjection = localBuildProjection() + iter.map(reusableProjection) + } } override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -53,11 +57,15 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - @transient lazy val conditionEvaluator: (InternalRow) => Boolean = + private def conditionEvaluator: (InternalRow) => Boolean = newPredicate(condition, child.output) - protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => - iter.filter(conditionEvaluator) + protected override def doExecute(): RDD[InternalRow] = { + // Use local variable to avoid referencing to $out inside closure + val localConditionEvaluator = conditionEvaluator + child.execute().mapPartitions { iter => + iter.filter(localConditionEvaluator) + } } override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -201,8 +209,8 @@ case class Sort( if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { + val ordering = newOrdering(sortOrder, child.output) child.execute().mapPartitions( { iterator => - val ordering = newOrdering(sortOrder, child.output) iterator.map(_.copy()).toArray.sorted(ordering).iterator }, preservesPartitioning = true) } @@ -229,8 +237,8 @@ case class ExternalSort( if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { + val ordering = newOrdering(sortOrder, child.output) child.execute().mapPartitions( { iterator => - val ordering = newOrdering(sortOrder, child.output) val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering)) sorter.insertAll(iterator.map(r => (r.copy, null))) val baseIterator = sorter.iterator.map(_._1)