From 6d076789bf981cd41cb99271763a8fa7ca703219 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 17 Apr 2015 11:57:34 -0700 Subject: [PATCH] Address comments. --- .../apache/spark/sql/execution/Exchange.scala | 25 +++++++++++++------ .../sql/execution/SparkSqlSerializer2.scala | 14 +++++++++-- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 0eb5fab263546..64e90be4d460f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -85,17 +85,28 @@ case class Exchange( keySchema: Array[DataType], valueSchema: Array[DataType], numPartitions: Int): Serializer = { + // In ExternalSorter's spillToMergeableFile function, key-value pairs are written out + // through write(key) and then write(value) instead of write((key, value)). Because + // SparkSqlSerializer2 assumes that objects passed in are Product2, we cannot safely use + // it when spillToMergeableFile in ExternalSorter will be used. + // So, we will not use SparkSqlSerializer2 when + // - Sort-based shuffle is enabled and the number of reducers (numPartitions) is greater + // then the bypassMergeThreshold; or + // - newOrdering is defined. + val cannotUseSqlSerializer2 = + (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) || newOrdering.nonEmpty + val useSqlSerializer2 = - !(sortBasedShuffleOn && numPartitions > bypassMergeThreshold) && - child.sqlContext.conf.useSqlSerializer2 && - SparkSqlSerializer2.support(keySchema) && - SparkSqlSerializer2.support(valueSchema) + child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled. + !cannotUseSqlSerializer2 && // Safe to use Serializer2. + SparkSqlSerializer2.support(keySchema) && // The schema of key is supported. + SparkSqlSerializer2.support(valueSchema) // The schema of value is supported. val serializer = if (useSqlSerializer2) { - logInfo("Use SparkSqlSerializer2.") + logInfo("Using SparkSqlSerializer2.") new SparkSqlSerializer2(keySchema, valueSchema) } else { - logInfo("Use SparkSqlSerializer.") + logInfo("Using SparkSqlSerializer.") new SparkSqlSerializer(sparkConf) } @@ -160,7 +171,7 @@ case class Exchange( } else { new ShuffledRDD[Row, Null, Null](rdd, part) } - val keySchema = sortingExpressions.map(_.dataType).toArray + val keySchema = child.output.map(_.dataType).toArray shuffled.setSerializer(serializer(keySchema, null, numPartitions)) shuffled.map(_._1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 974aae18fb2c9..cec97de2cd8e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -31,7 +31,17 @@ import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.types._ /** - * The serialization stream for SparkSqlSerializer2. + * The serialization stream for [[SparkSqlSerializer2]]. It assumes that the object passed in + * its `writeObject` are [[Product2]]. The serialization functions for the key and value of the + * [[Product2]] are constructed based on their schemata. + * The benefit of this serialization stream is that compared with general-purpose serializers like + * Kryo and Java serializer, it can significantly reduce the size of serialized and has a lower + * allocation cost, which can benefit the shuffle operation. Right now, its main limitations are: + * 1. It does not support complex types, i.e. Map, Array, and Struct. + * 2. It assumes that the objects passed in are [[Product2]]. So, it cannot be used when + * [[org.apache.spark.util.collection.ExternalSorter]]'s merge sort operation is used because + * the objects passed in the serializer are not in the type of [[Product2]]. Also also see + * the comment of the `serializer` method in [[Exchange]] for more information on it. */ private[sql] class Serializer2SerializationStream( keySchema: Array[DataType], @@ -61,7 +71,7 @@ private[sql] class Serializer2SerializationStream( } /** - * The deserialization stream for SparkSqlSerializer2. + * The corresponding deserialization stream for [[Serializer2SerializationStream]]. */ private[sql] class Serializer2DeserializationStream( keySchema: Array[DataType],