diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index dc4f289ae7f89..052be54d8c3f9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -115,7 +115,7 @@ public BypassMergeSortShuffleWriter( this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); - this.serializer = Serializer.getSerializer(dep.serializer()); + this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 3f4402bd3a652..cd06ce9fb911e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -116,7 +116,7 @@ public UnsafeShuffleWriter( this.mapId = mapId; final ShuffleDependency dep = handle.dependency(); this.shuffleId = dep.shuffleId(); - this.serializer = Serializer.getSerializer(dep.serializer()).newInstance(); + this.serializer = dep.serializer().newInstance(); this.partitioner = dep.partitioner(); this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); this.taskContext = taskContext; diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index b65cfdc4df17d..ca52ecafa2cc8 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -59,9 +59,9 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output - * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None, - * the default serializer, as specified by `spark.serializer` config option, will - * be used. + * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set + * explicitly then the default serializer, as specified by `spark.serializer` + * config option, will be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) @@ -70,7 +70,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, - val serializer: Option[Serializer] = None, + val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b3b3729625ad5..668a913a20b38 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint -import org.apache.spark.serializer.{JavaSerializer, Serializer} +import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage._ import org.apache.spark.util.{RpcUtils, Utils} @@ -56,6 +56,7 @@ class SparkEnv ( private[spark] val rpcEnv: RpcEnv, val serializer: Serializer, val closureSerializer: Serializer, + val serializerManager: SerializerManager, val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, @@ -276,6 +277,8 @@ object SparkEnv extends Logging { "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") + val serializerManager = new SerializerManager(serializer, conf) + val closureSerializer = new JavaSerializer(conf) def registerOrLookupEndpoint( @@ -368,6 +371,7 @@ object SparkEnv extends Logging { rpcEnv, serializer, closureSerializer, + serializerManager, mapOutputTracker, shuffleManager, broadcastManager, diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index d9b0824b38ecc..e5ebc6308280b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -86,11 +86,11 @@ class CoGroupedRDD[K: ClassTag]( private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Array[CoGroup] - private var serializer: Option[Serializer] = None + private var serializer: Serializer = SparkEnv.get.serializer /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): CoGroupedRDD[K] = { - this.serializer = Option(serializer) + this.serializer = serializer this } diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 3ef506e1562bf..800b42505de10 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -44,7 +44,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( part: Partitioner) extends RDD[(K, C)](prev.context, Nil) { - private var serializer: Option[Serializer] = None + private var userSpecifiedSerializer: Option[Serializer] = None private var keyOrdering: Option[Ordering[K]] = None @@ -54,7 +54,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C] = { - this.serializer = Option(serializer) + this.userSpecifiedSerializer = Option(serializer) this } @@ -77,6 +77,14 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( } override def getDependencies: Seq[Dependency[_]] = { + val serializer = userSpecifiedSerializer.getOrElse { + val serializerManager = SparkEnv.get.serializerManager + if (mapSideCombine) { + serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]]) + } else { + serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]]) + } + } List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 25ec685eff5ab..a733eaa5d7e53 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -30,7 +30,6 @@ import org.apache.spark.Partitioner import org.apache.spark.ShuffleDependency import org.apache.spark.SparkEnv import org.apache.spark.TaskContext -import org.apache.spark.serializer.Serializer /** * An optimized version of cogroup for set difference/subtraction. @@ -54,13 +53,6 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) { - private var serializer: Option[Serializer] = None - - /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ - def setSerializer(serializer: Serializer): SubtractedRDD[K, V, W] = { - this.serializer = Option(serializer) - this - } override def getDependencies: Seq[Dependency[_]] = { def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]]) @@ -70,7 +62,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) - new ShuffleDependency[T1, T2, Any](rdd, part, serializer) + new ShuffleDependency[T1, T2, Any](rdd, part) } } Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2)) diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 95bdf0ce2df44..5ead40e89e29f 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -100,18 +100,6 @@ abstract class Serializer { } -@DeveloperApi -object Serializer { - def getSerializer(serializer: Serializer): Serializer = { - if (serializer == null) SparkEnv.get.serializer else serializer - } - - def getSerializer(serializer: Option[Serializer]): Serializer = { - serializer.getOrElse(SparkEnv.get.serializer) - } -} - - /** * :: DeveloperApi :: * An instance of a serializer, for use by one thread at a time. diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala new file mode 100644 index 0000000000000..b9f115463a6eb --- /dev/null +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf + +/** + * Component that selects which [[Serializer]] to use for shuffles. + */ +private[spark] class SerializerManager(defaultSerializer: Serializer, conf: SparkConf) { + + private[this] val kryoSerializer = new KryoSerializer(conf) + + private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { + val primitiveClassTags = Set[ClassTag[_]]( + ClassTag.Boolean, + ClassTag.Byte, + ClassTag.Char, + ClassTag.Double, + ClassTag.Float, + ClassTag.Int, + ClassTag.Long, + ClassTag.Null, + ClassTag.Short + ) + val arrayClassTags = primitiveClassTags.map(_.wrap) + primitiveClassTags ++ arrayClassTags + } + + private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] + + private def canUseKryo(ct: ClassTag[_]): Boolean = { + primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag + } + + def getSerializer(ct: ClassTag[_]): Serializer = { + if (canUseKryo(ct)) { + kryoSerializer + } else { + defaultSerializer + } + } + + /** + * Pick the best serializer for shuffling an RDD of key-value pairs. + */ + def getSerializer(keyClassTag: ClassTag[_], valueClassTag: ClassTag[_]): Serializer = { + if (canUseKryo(keyClassTag) && canUseKryo(valueClassTag)) { + kryoSerializer + } else { + defaultSerializer + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index dc182f596335b..69183d9936fb5 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -54,8 +54,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( blockManager.wrapForCompression(blockId, inputStream) } - val ser = Serializer.getSerializer(dep.serializer) - val serializerInstance = ser.newInstance() + val serializerInstance = dep.serializer.newInstance() // Create a key/value iterator for each stream val recordIter = wrappedStreams.flatMap { wrappedStream => @@ -100,7 +99,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = - new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = Some(ser)) + new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 7694e950be223..22b31994e79cd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -21,7 +21,6 @@ import java.io.IOException import org.apache.spark._ import org.apache.spark.scheduler.MapStatus -import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle._ import org.apache.spark.storage.DiskBlockObjectWriter @@ -44,9 +43,8 @@ private[spark] class HashShuffleWriter[K, V]( private val writeMetrics = metrics.registerShuffleWriteMetrics() private val blockManager = SparkEnv.get.blockManager - private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) - private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, - writeMetrics) + private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, + dep.serializer, writeMetrics) /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 9b1a279528428..f7744d12c51bd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort import java.util.concurrent.ConcurrentHashMap import org.apache.spark._ -import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle._ /** @@ -186,10 +185,9 @@ private[spark] object SortShuffleManager extends Logging { def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions - val serializer = Serializer.getSerializer(dependency.serializer) - if (!serializer.supportsRelocationOfSerializedObjects) { + if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + - s"${serializer.getClass.getName}, does not support object relocation") + s"${dependency.serializer.getClass.getName}, does not support object relocation") false } else if (dependency.aggregator.isDefined) { log.debug( diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 5afd6d6e22c62..4bcdcb0774c9a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -91,7 +91,7 @@ private[spark] class ExternalSorter[K, V, C]( aggregator: Option[Aggregator[K, V, C]] = None, partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) + serializer: Serializer = SparkEnv.get.serializer) extends Logging with Spillable[WritablePartitionedPairCollection[K, C]] { @@ -107,8 +107,7 @@ private[spark] class ExternalSorter[K, V, C]( private val blockManager = SparkEnv.get.blockManager private val diskBlockManager = blockManager.diskBlockManager - private val ser = Serializer.getSerializer(serializer) - private val serInstance = ser.newInstance() + private val serInstance = serializer.newInstance() // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index ddea6f5a69b18..47c695ad4e717 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -191,7 +191,7 @@ public Tuple2 answer( }); when(taskContext.taskMetrics()).thenReturn(taskMetrics); - when(shuffleDep.serializer()).thenReturn(Option.apply(serializer)); + when(shuffleDep.serializer()).thenReturn(serializer); when(shuffleDep.partitioner()).thenReturn(hashPartitioner); } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 26a372d6a905d..08f52c92e1812 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -127,7 +127,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Create a mocked shuffle handle to pass into HashShuffleReader. val shuffleHandle = { val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]]) - when(dependency.serializer).thenReturn(Some(serializer)) + when(dependency.serializer).thenReturn(serializer) when(dependency.aggregator).thenReturn(None) when(dependency.keyOrdering).thenReturn(None) new BaseShuffleHandle(shuffleId, numMaps, dependency) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index cf9f9da1e6913..16418f855bbe1 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -66,7 +66,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte dependency = dependency ) when(dependency.partitioner).thenReturn(new HashPartitioner(7)) - when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf))) + when(dependency.serializer).thenReturn(new JavaSerializer(conf)) when(taskContext.taskMetrics()).thenReturn(taskMetrics) when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) doAnswer(new Answer[Void] { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala index 8744a072cb3f6..55cebe7c8b6a8 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala @@ -41,7 +41,7 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers { private def shuffleDep( partitioner: Partitioner, - serializer: Option[Serializer], + serializer: Serializer, keyOrdering: Option[Ordering[Any]], aggregator: Option[Aggregator[Any, Any, Any]], mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = { @@ -56,7 +56,7 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers { } test("supported shuffle dependencies for serialized shuffle") { - val kryo = Some(new KryoSerializer(new SparkConf())) + val kryo = new KryoSerializer(new SparkConf()) assert(canUseSerializedShuffle(shuffleDep( partitioner = new HashPartitioner(2), @@ -88,8 +88,8 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers { } test("unsupported shuffle dependencies for serialized shuffle") { - val kryo = Some(new KryoSerializer(new SparkConf())) - val java = Some(new JavaSerializer(new SparkConf())) + val kryo = new KryoSerializer(new SparkConf()) + val java = new JavaSerializer(new SparkConf()) // We only support serializers that support object relocation assert(!canUseSerializedShuffle(shuffleDep( diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index a62adf1c2c543..a1a7ac97d924b 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -110,7 +110,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { createCombiner _, mergeValue _, mergeCombiners _) val sorter = new ExternalSorter[String, String, ArrayBuffer[String]]( - context, Some(agg), None, None, None) + context, Some(agg), None, None) val collisionPairs = Seq( ("Aa", "BB"), // 2112 @@ -161,7 +161,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) - val sorter = new ExternalSorter[FixedHashObject, Int, Int](context, Some(agg), None, None, None) + val sorter = new ExternalSorter[FixedHashObject, Int, Int](context, Some(agg), None, None) // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes // problems if the map fails to group together the objects with the same code (SPARK-2043). val toInsert = for (i <- 1 to 10; j <- 1 to size) yield (FixedHashObject(j, j % 2), 1) @@ -192,7 +192,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) val sorter = - new ExternalSorter[Int, Int, ArrayBuffer[Int]](context, Some(agg), None, None, None) + new ExternalSorter[Int, Int, ArrayBuffer[Int]](context, Some(agg), None, None) sorter.insertAll( (1 to size).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) assert(sorter.numSpills > 0, "sorter did not spill") @@ -219,7 +219,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { createCombiner, mergeValue, mergeCombiners) val sorter = new ExternalSorter[String, String, ArrayBuffer[String]]( - context, Some(agg), None, None, None) + context, Some(agg), None, None) sorter.insertAll((1 to size).iterator.map(i => (i.toString, i.toString)) ++ Iterator( (null.asInstanceOf[String], "1"), @@ -283,25 +283,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { // Both aggregator and ordering val sorter = new ExternalSorter[Int, Int, Int]( - context, Some(agg), Some(new HashPartitioner(3)), Some(ord), None) + context, Some(agg), Some(new HashPartitioner(3)), Some(ord)) assert(sorter.iterator.toSeq === Seq()) sorter.stop() // Only aggregator val sorter2 = new ExternalSorter[Int, Int, Int]( - context, Some(agg), Some(new HashPartitioner(3)), None, None) + context, Some(agg), Some(new HashPartitioner(3)), None) assert(sorter2.iterator.toSeq === Seq()) sorter2.stop() // Only ordering val sorter3 = new ExternalSorter[Int, Int, Int]( - context, None, Some(new HashPartitioner(3)), Some(ord), None) + context, None, Some(new HashPartitioner(3)), Some(ord)) assert(sorter3.iterator.toSeq === Seq()) sorter3.stop() // Neither aggregator nor ordering val sorter4 = new ExternalSorter[Int, Int, Int]( - context, None, Some(new HashPartitioner(3)), None, None) + context, None, Some(new HashPartitioner(3)), None) assert(sorter4.iterator.toSeq === Seq()) sorter4.stop() } @@ -320,28 +320,28 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { // Both aggregator and ordering val sorter = new ExternalSorter[Int, Int, Int]( - context, Some(agg), Some(new HashPartitioner(7)), Some(ord), None) + context, Some(agg), Some(new HashPartitioner(7)), Some(ord)) sorter.insertAll(elements.iterator) assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter.stop() // Only aggregator val sorter2 = new ExternalSorter[Int, Int, Int]( - context, Some(agg), Some(new HashPartitioner(7)), None, None) + context, Some(agg), Some(new HashPartitioner(7)), None) sorter2.insertAll(elements.iterator) assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter2.stop() // Only ordering val sorter3 = new ExternalSorter[Int, Int, Int]( - context, None, Some(new HashPartitioner(7)), Some(ord), None) + context, None, Some(new HashPartitioner(7)), Some(ord)) sorter3.insertAll(elements.iterator) assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter3.stop() // Neither aggregator nor ordering val sorter4 = new ExternalSorter[Int, Int, Int]( - context, None, Some(new HashPartitioner(7)), None, None) + context, None, Some(new HashPartitioner(7)), None) sorter4.insertAll(elements.iterator) assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter4.stop() @@ -358,7 +358,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val elements = Iterator((1, 1), (5, 5)) ++ (0 until size).iterator.map(x => (2, 2)) val sorter = new ExternalSorter[Int, Int, Int]( - context, None, Some(new HashPartitioner(7)), Some(ord), None) + context, None, Some(new HashPartitioner(7)), Some(ord)) sorter.insertAll(elements) assert(sorter.numSpills > 0, "sorter did not spill") val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) @@ -442,7 +442,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val expectedSize = if (withFailures) size - 1 else size val context = MemoryTestingUtils.fakeTaskContext(sc.env) val sorter = new ExternalSorter[Int, Int, Int]( - context, None, Some(new HashPartitioner(3)), Some(ord), None) + context, None, Some(new HashPartitioner(3)), Some(ord)) if (withFailures) { intercept[SparkException] { sorter.insertAll((0 until size).iterator.map { i => @@ -512,7 +512,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = if (withOrdering) Some(implicitly[Ordering[Int]]) else None val context = MemoryTestingUtils.fakeTaskContext(sc.env) val sorter = - new ExternalSorter[Int, Int, Int](context, agg, Some(new HashPartitioner(3)), ord, None) + new ExternalSorter[Int, Int, Int](context, agg, Some(new HashPartitioner(3)), ord) sorter.insertAll((0 until size).iterator.map { i => (i / 4, i) }) if (withSpilling) { assert(sorter.numSpills > 0, "sorter did not spill") @@ -551,7 +551,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val context = MemoryTestingUtils.fakeTaskContext(sc.env) val sorter1 = new ExternalSorter[String, String, String]( - context, None, None, Some(wrongOrdering), None) + context, None, None, Some(wrongOrdering)) val thrown = intercept[IllegalArgumentException] { sorter1.insertAll(testData.iterator.map(i => (i, i))) assert(sorter1.numSpills > 0, "sorter did not spill") @@ -573,7 +573,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { createCombiner, mergeValue, mergeCombiners) val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]]( - context, Some(agg), None, None, None) + context, Some(agg), None, None) sorter2.insertAll(testData.iterator.map(i => (i, i))) assert(sorter2.numSpills > 0, "sorter did not spill") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6d392829237b3..5e8d13b5b8d68 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -531,6 +531,11 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListener.onOtherEvent"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.CreatableRelationProvider.createRelation"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.InsertableRelation.insert") + ) ++ Seq( + // [SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ShuffleDependency.this"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ShuffleDependency.serializer"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.serializer.Serializer$") ) ++ Seq( // SPARK-13927: add row/column iterator to local matrices ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Matrix.rowIter"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index 4eb4d9adbddc8..7e35db7dd8a79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -261,7 +261,7 @@ object ShuffleExchange { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), - Some(serializer)) + serializer) dependency } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 50bdcd6c2cd8d..1f3779373b5d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -118,7 +118,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow]( taskContext, partitioner = Some(new HashPartitioner(10)), - serializer = Some(new UnsafeRowSerializer(numFields = 1))) + serializer = new UnsafeRowSerializer(numFields = 1)) // Ensure we spilled something and have to merge them later assert(sorter.numSpills === 0) @@ -153,7 +153,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { new ShuffleDependency[Int, InternalRow, InternalRow]( rowsRDD, new PartitionIdPassthrough(2), - Some(new UnsafeRowSerializer(2))) + new UnsafeRowSerializer(2)) val shuffled = new ShuffledRowRDD(dependency) shuffled.count() }