Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 17, 2015
1 parent 4273b8c commit 6d07678
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,28 @@ case class Exchange(
keySchema: Array[DataType],
valueSchema: Array[DataType],
numPartitions: Int): Serializer = {
// In ExternalSorter's spillToMergeableFile function, key-value pairs are written out
// through write(key) and then write(value) instead of write((key, value)). Because
// SparkSqlSerializer2 assumes that objects passed in are Product2, we cannot safely use
// it when spillToMergeableFile in ExternalSorter will be used.
// So, we will not use SparkSqlSerializer2 when
// - Sort-based shuffle is enabled and the number of reducers (numPartitions) is greater
// then the bypassMergeThreshold; or
// - newOrdering is defined.
val cannotUseSqlSerializer2 =
(sortBasedShuffleOn && numPartitions > bypassMergeThreshold) || newOrdering.nonEmpty

val useSqlSerializer2 =
!(sortBasedShuffleOn && numPartitions > bypassMergeThreshold) &&
child.sqlContext.conf.useSqlSerializer2 &&
SparkSqlSerializer2.support(keySchema) &&
SparkSqlSerializer2.support(valueSchema)
child.sqlContext.conf.useSqlSerializer2 && // SparkSqlSerializer2 is enabled.
!cannotUseSqlSerializer2 && // Safe to use Serializer2.
SparkSqlSerializer2.support(keySchema) && // The schema of key is supported.
SparkSqlSerializer2.support(valueSchema) // The schema of value is supported.

val serializer = if (useSqlSerializer2) {
logInfo("Use SparkSqlSerializer2.")
logInfo("Using SparkSqlSerializer2.")
new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Use SparkSqlSerializer.")
logInfo("Using SparkSqlSerializer.")
new SparkSqlSerializer(sparkConf)
}

Expand Down Expand Up @@ -160,7 +171,7 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Null, Null](rdd, part)
}
val keySchema = sortingExpressions.map(_.dataType).toArray
val keySchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, null, numPartitions))

shuffled.map(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,17 @@ import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.types._

/**
* The serialization stream for SparkSqlSerializer2.
* The serialization stream for [[SparkSqlSerializer2]]. It assumes that the object passed in
* its `writeObject` are [[Product2]]. The serialization functions for the key and value of the
* [[Product2]] are constructed based on their schemata.
* The benefit of this serialization stream is that compared with general-purpose serializers like
* Kryo and Java serializer, it can significantly reduce the size of serialized and has a lower
* allocation cost, which can benefit the shuffle operation. Right now, its main limitations are:
* 1. It does not support complex types, i.e. Map, Array, and Struct.
* 2. It assumes that the objects passed in are [[Product2]]. So, it cannot be used when
* [[org.apache.spark.util.collection.ExternalSorter]]'s merge sort operation is used because
* the objects passed in the serializer are not in the type of [[Product2]]. Also also see
* the comment of the `serializer` method in [[Exchange]] for more information on it.
*/
private[sql] class Serializer2SerializationStream(
keySchema: Array[DataType],
Expand Down Expand Up @@ -61,7 +71,7 @@ private[sql] class Serializer2SerializationStream(
}

/**
* The deserialization stream for SparkSqlSerializer2.
* The corresponding deserialization stream for [[Serializer2SerializationStream]].
*/
private[sql] class Serializer2DeserializationStream(
keySchema: Array[DataType],
Expand Down

0 comments on commit 6d07678

Please sign in to comment.