Skip to content

Commit

Permalink
Specialized serializer for Exchange.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 13, 2015
1 parent 1e340c3 commit 39704ab
Show file tree
Hide file tree
Showing 5 changed files with 618 additions and 4 deletions.
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[spark] object SQLConf {
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"

val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -139,6 +141,8 @@ private[sql] class SQLConf extends Serializable {
*/
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean

private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "false").toBoolean

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.types.DataType
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.{Attribute, RowOrdering}
Expand All @@ -45,6 +46,27 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
private val bypassMergeThreshold =
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)

def serializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
numPartitions: Int): Serializer = {
val useSqlSerializer2 =
!(sortBasedShuffleOn && numPartitions > bypassMergeThreshold) &&
child.sqlContext.conf.useSqlSerializer2 &&
SparkSqlSerializer2.support(keySchema) &&
SparkSqlSerializer2.support(valueSchema)

val serializer = if (useSqlSerializer2) {
logInfo("Use ShuffleSerializer")
new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Use SparkSqlSerializer")
new SparkSqlSerializer(new SparkConf(false))
}

serializer
}

override def execute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
Expand All @@ -70,7 +92,11 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
}
val part = new HashPartitioner(numPartitions)
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))

val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, valueSchema, numPartitions))

shuffled.map(_._2)

case RangePartitioning(sortingExpressions, numPartitions) =>
Expand All @@ -88,7 +114,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una

val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))

val keySchema = sortingExpressions.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, null, numPartitions))

shuffled.map(_._1)

Expand All @@ -107,7 +135,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))

val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(null, valueSchema, 1))

shuffled.map(_._2)

case _ => sys.error(s"Exchange not implemented for $newPartitioning")
Expand Down
Loading

0 comments on commit 39704ab

Please sign in to comment.