From 80d68a2ed9979fbfd84f99a3bc34beb51ff449cd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 25 Nov 2014 10:57:31 +0800 Subject: [PATCH 1/2] Implement skewed join --- .../apache/spark/rdd/PairRDDFunctions.scala | 240 ++++++++- .../org/apache/spark/rdd/SkewedJoinRDD.scala | 345 +++++++++++++ .../spark/util/collection/ChunkBuffer.scala | 341 +++++++++++++ .../spark/util/collection/CompactBuffer.scala | 2 +- .../ExternalOrderingAppendOnlyMap.scala | 456 ++++++++++++++++++ .../apache/spark/rdd/SkewedJoinSuite.scala | 127 +++++ .../util/collection/ChunkBufferSuite.scala | 115 +++++ .../ExternalOrderingAppendOnlyMapSuite.scala | 309 ++++++++++++ 8 files changed, 1917 insertions(+), 18 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala create mode 100644 core/src/main/scala/org/apache/spark/util/collection/ChunkBuffer.scala create mode 100644 core/src/main/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMap.scala create mode 100644 core/src/test/scala/org/apache/spark/rdd/SkewedJoinSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/util/collection/ChunkBufferSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMapSuite.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8c2c959e73bb6..83fb5190e3e32 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -481,9 +481,203 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { - this.cogroup(other, partitioner).flatMapValues( pair => - for (v <- pair._1; w <- pair._2) yield (v, w) - ) + if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { + skewedJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) + } else { + this.cogroup(other, partitioner).flatMapValues { pair => + for (v <- pair._1; w <- pair._2) yield (v, w) + } + } + } + + /** + * ::Experimental:: + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + skewedJoin(other, defaultPartitioner(self, other)) + } + + /** + * ::Experimental:: + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedJoin[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + skewedJoin(other, new HashPartitioner(numPartitions)) + } + + /** + * ::Experimental:: + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedJoin[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + new SkewedJoinRDD[K, V, W, (V, W)](self, other, partitioner, JoinType.inner) + } + + /** + * ::Experimental:: + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedLeftOuterJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + skewedLeftOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * ::Experimental:: + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedLeftOuterJoin[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): + RDD[(K, (V, Option[W]))] = { + skewedLeftOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** + * ::Experimental:: + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedLeftOuterJoin[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): + RDD[(K, (V, Option[W]))] = { + new SkewedJoinRDD[K, V, W, (V, Option[W])](self, other, partitioner, JoinType.leftOuter) + } + + /** + * ::Experimental:: + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedRightOuterJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + skewedRightOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * ::Experimental:: + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedRightOuterJoin[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): + RDD[(K, (Option[V], W))] = { + skewedRightOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** + * ::Experimental:: + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedRightOuterJoin[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): + RDD[(K, (Option[V], W))] = { + new SkewedJoinRDD[K, V, W, (Option[V], W)](self, other, partitioner, JoinType.rightOuter) + } + + /** + * ::Experimental:: + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ + * parallelism level. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedFullOuterJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + skewedFullOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * ::Experimental:: + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedFullOuterJoin[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): + RDD[(K, (Option[V], Option[W]))] = { + skewedFullOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** + * ::Experimental:: + * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or + * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each + * element (k, w) in `other`, the resulting RDD will either contain all pairs + * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements + * in `this` have key k. Uses the given Partitioner to partition the output RDD. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedFullOuterJoin[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): + RDD[(K, (Option[V], Option[W]))] = { + new SkewedJoinRDD[K, V, W, (Option[V], Option[W])](self, other, partitioner, JoinType.fullOuter) } /** @@ -493,11 +687,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * partition the output RDD. */ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { - this.cogroup(other, partitioner).flatMapValues { pair => - if (pair._2.isEmpty) { - pair._1.map(v => (v, None)) - } else { - for (v <- pair._1; w <- pair._2) yield (v, Some(w)) + if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { + skewedLeftOuterJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) + } else { + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._2.isEmpty) { + pair._1.map(v => (v, None)) + } else { + for (v <- pair._1; w <- pair._2) yield (v, Some(w)) + } } } } @@ -510,11 +708,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { - this.cogroup(other, partitioner).flatMapValues { pair => - if (pair._1.isEmpty) { - pair._2.map(w => (None, w)) - } else { - for (v <- pair._1; w <- pair._2) yield (Some(v), w) + if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { + skewedRightOuterJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) + } else { + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._1.isEmpty) { + pair._2.map(w => (None, w)) + } else { + for (v <- pair._1; w <- pair._2) yield (Some(v), w) + } } } } @@ -529,10 +731,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = { - this.cogroup(other, partitioner).flatMapValues { - case (vs, Seq()) => vs.map(v => (Some(v), None)) - case (Seq(), ws) => ws.map(w => (None, Some(w))) - case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) + if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { + skewedFullOuterJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) + } else { + this.cogroup(other, partitioner).flatMapValues { + case (vs, Seq()) => vs.map(v => (Some(v), None)) + case (Seq(), ws) => ws.map(w => (None, Some(w))) + case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) + } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala new file mode 100644 index 0000000000000..ad58f0516d663 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{ObjectOutputStream, IOException} + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection._ + +private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] extends Serializable { + + def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): Iterator[(K, PAIR)] + +} + +private[spark] object JoinType { + + def inner[K, L, R] = new JoinType[K, L, R, (L, R)] { + + override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { + case (key, pair) => { + if (pair._1.size < pair._2.size) { + yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2)) + } else { + yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2)) + } + } + } + } + + def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] { + + override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { + case (key, pair) => { + if (pair._2.size == 0) { + for (chunk <- pair._1.iterator; + v <- chunk + ) yield (key, (v, None)): (K, (L, Option[R])) + } + else if (pair._1.size < pair._2.size) { + yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, Some(p2))) + } else { + yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, Some(p2))) + } + } + } + } + + def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] { + + override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { + case (key, pair) => { + if (pair._1.size == 0) { + for (chunk <- pair._2.iterator; + v <- chunk + ) yield (key, (None, v)): (K, (Option[L], R)) + } + else if (pair._1.size < pair._2.size) { + yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), p2)) + } else { + yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), p2)) + } + } + } + } + + def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] { + + override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { + case (key, pair) => { + if (pair._1.size == 0) { + for (chunk <- pair._2.iterator; + v <- chunk + ) yield (key, (None, Some(v))): (K, (Option[L], Option[R])) + } + else if (pair._2.size == 0) { + for (chunk <- pair._1.iterator; + v <- chunk + ) yield (key, (Some(v), None)): (K, (Option[L], Option[R])) + } + else if (pair._1.size < pair._2.size) { + yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), Some(p2))) + } else { + yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), Some(p2))) + } + } + } + } + + private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]]( + outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, toPair: (OUT, IN) => PAIR) = + for ( + outerChunk <- outer.iterator; + innerChunk <- inner.iterator; + outerValue <- outerChunk; + innerValue <- innerChunk + ) yield (key, toPair(outerValue, innerValue)) +} + +private[spark] sealed trait JoinSplitDep extends Serializable + +private[spark] case class NarrowJoinSplitDep( + rdd: RDD[_], + splitIndex: Int, + var split: Partition) extends JoinSplitDep { + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { + // Update the reference to parent split at the time of task serialization + split = rdd.partitions(splitIndex) + oos.defaultWriteObject() + } +} + +private[spark] case class ShuffleJoinSplitDep(handle: ShuffleHandle) extends JoinSplitDep + +private[spark] class JoinPartition(idx: Int, val left: JoinSplitDep, val right: JoinSplitDep) + extends Partition with Serializable { + override val index: Int = idx + + override def hashCode(): Int = idx +} + +private[spark] class BufferWrapper private(val isChunkBuffer: Boolean, buffer: Iterable[Any]) + extends Serializable { + + def this(buffer: CompactBuffer[_]) { + this(false, buffer) + } + + def this(buffer: ChunkBuffer[_]) { + this(true, buffer) + } + + def getChunkBuffer[T: ClassTag]: ChunkBuffer[T] = buffer.asInstanceOf[ChunkBuffer[T]] + + def getCompactBuffer[T: ClassTag]: CompactBuffer[T] = buffer.asInstanceOf[CompactBuffer[T]] + + def asChunkIterable[T: ClassTag]: Iterable[Chunk[T]] = { + if (isChunkBuffer) { + getChunkBuffer[T] + } + else { + val buffer = getCompactBuffer[T] + if (buffer.isEmpty) { + Iterable[Chunk[T]]() + } + else { + Iterable(new Chunk[T](buffer)) + } + } + } +} + +private[spark] class SkewedJoinRDD[K, L, R, PAIR <: Product2[_, _]]( + left: RDD[(K, L)], right: RDD[(K, R)], part: Partitioner, joinType: JoinType[K, L, R, PAIR]) + (implicit kt: ClassTag[K], lt: ClassTag[L], rt: ClassTag[R], ord: Ordering[K]) + extends RDD[(K, PAIR)](left.context, Nil) with Logging { + + // Ordering is necessary. ExternalOrderingAppendOnlyMap needs it to prefetch a key without + // loading its value to avoid OOM. + require(ord != null, "No implicit Ordering defined for " + kt.runtimeClass) + + private type JoinValue = (Option[L], Option[R]) + private type JoinCombiner = (ChunkBuffer[L], ChunkBuffer[R]) + private type JoinIterableCombiner = (BufferWrapper, BufferWrapper) + + private var serializer: Option[Serializer] = None + + /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ + def setSerializer(serializer: Serializer): SkewedJoinRDD[K, L, R, PAIR] = { + this.serializer = Option(serializer) + this + } + + override def getDependencies: Seq[Dependency[_]] = { + Seq(left, right).map { rdd: RDD[_ <: Product2[K, _]] => + if (rdd.partitioner == Some(part)) { + logDebug("Adding one-to-one dependency with " + rdd) + new OneToOneDependency(rdd) + } else { + logDebug("Adding shuffle dependency with " + rdd) + new ShuffleDependency[K, Any, JoinCombiner](rdd, part, serializer) + } + } + } + + private def getJoinSplitDep(rdd: RDD[_], index: Int, dep: Dependency[_]): JoinSplitDep = + dep match { + case s: ShuffleDependency[_, _, _] => + new ShuffleJoinSplitDep(s.shuffleHandle) + case _ => + new NarrowJoinSplitDep(rdd, index, rdd.partitions(index)) + } + + override def getPartitions: Array[Partition] = { + val array = new Array[Partition](part.numPartitions) + for (i <- 0 until array.size) { + array(i) = new JoinPartition(i, + getJoinSplitDep(left, i, dependencies(0)), + getJoinSplitDep(right, i, dependencies(1))) + } + array + } + + override val partitioner: Some[Partitioner] = Some(part) + + override def compute(s: Partition, context: TaskContext): Iterator[(K, PAIR)] = { + joinType.flatten(internalCompute(s, context).map { + case (key, (i1, i2)) => { + if (i1.isInstanceOf[ChunkBuffer[_]]) { + context.taskMetrics().diskBytesSpilled += + i1.asInstanceOf[ChunkBuffer[Chunk[L]]].diskBytesSpilled + } + if (i2.isInstanceOf[ChunkBuffer[_]]) { + context.taskMetrics().diskBytesSpilled += + i2.asInstanceOf[ChunkBuffer[Chunk[R]]].diskBytesSpilled + } + (key, (i1, i2)) + } + }) + } + + private def internalCompute(s: Partition, context: TaskContext): + Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))] = { + val sparkConf = SparkEnv.get.conf + val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) + val split = s.asInstanceOf[JoinPartition] + val leftIter = joinSplitDepToIterator[L](split.left, split.index, context) + val rightIter = joinSplitDepToIterator[R](split.right, split.index, context) + + val parameters = new ChunkParameters + if (!externalSorting) { + val map = new AppendOnlyMap[K, JoinCombiner] + val update: (Boolean, JoinCombiner) => JoinCombiner = (hadVal, oldVal) => { + if (hadVal) oldVal else (ChunkBuffer[L](parameters), ChunkBuffer[R](parameters)) + } + val getCombiner: K => JoinCombiner = key => { + map.changeValue(key, update) + } + while (leftIter.hasNext) { + val kv = leftIter.next() + getCombiner(kv._1)._1 += kv._2 + } + while (rightIter.hasNext) { + val kv = rightIter.next() + getCombiner(kv._1)._2 += kv._2 + } + new InterruptibleIterator(context, + map.iterator.asInstanceOf[Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]]) + } else { + val map = createExternalMap(parameters) + map.insertAll(leftIter.map(item => (item._1, (Some(item._2), None)))) + map.insertAll(rightIter.map(item => (item._1, (None, Some(item._2))))) + context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled + new InterruptibleIterator(context, + map.iterator.map { + case (key, combiner) => { + val left = combiner._1.asChunkIterable[L] + val right = combiner._2.asChunkIterable[R] + (key, (left, right)) + } + }) + } + } + + private def joinSplitDepToIterator[V](dep: JoinSplitDep, splitIndex: Int, context: TaskContext): + Iterator[Product2[K, V]] = + dep match { + case NarrowJoinSplitDep(rdd, _, itsSplit) => + rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]] + case ShuffleJoinSplitDep(handle) => + SparkEnv.get.shuffleManager + .getReader(handle, splitIndex, splitIndex + 1, context) + .read() + } + + private def createExternalMap(parameters: ChunkParameters) + : ExternalOrderingAppendOnlyMap[K, JoinValue, JoinIterableCombiner] = { + + val createCombiner: JoinValue => JoinIterableCombiner = + value => { + val left = CompactBuffer[L]() + value._1.foreach(left += _) + val right = CompactBuffer[R]() + value._2.foreach(right += _) + (new BufferWrapper(left), new BufferWrapper(right)) + } + + val mergeValue: (JoinIterableCombiner, JoinValue) => JoinIterableCombiner = + (combiner, value) => { + value._1.foreach(combiner._1.getCompactBuffer[L] += _) + value._2.foreach(combiner._2.getCompactBuffer[R] += _) + combiner + } + + val createExternalCombiner = () => { + val left = new ChunkBuffer[L](parameters) + val right = new ChunkBuffer[R](parameters) + (new BufferWrapper(left), new BufferWrapper(right)) + } + + // Call when reading data from the disks + // The real pattern is: + // ((ChunkBuffer, ChunkBuffer), (CompactBuffer, CompactBuffer)) => (ChunkBuffer, ChunkBuffer) + def mergeCombiners(combiner1: JoinIterableCombiner, combiner2: JoinIterableCombiner): + JoinIterableCombiner = { + assert(combiner1._1.isChunkBuffer && combiner1._2.isChunkBuffer, "left must be a ChunkBuffer") + assert(!combiner2._1.isChunkBuffer && !combiner2._2.isChunkBuffer, + "right must be a CompactBuffer") + + combiner1._1.getChunkBuffer[L] ++= combiner2._1.getCompactBuffer[L] + combiner1._2.getChunkBuffer[R] ++= combiner2._2.getCompactBuffer[R] + combiner1 + } + + new ExternalOrderingAppendOnlyMap[K, JoinValue, JoinIterableCombiner]( + createCombiner, mergeValue, createExternalCombiner, mergeCombiners) + } +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/ChunkBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/ChunkBuffer.scala new file mode 100644 index 0000000000000..4308d28f539b6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/ChunkBuffer.scala @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import com.google.common.io.ByteStreams + +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.serializer.{DeserializationStream, SerializerInstance} +import org.apache.spark.SparkEnv +import org.apache.spark.storage.{BlockId, BlockManager} + +private[spark] class SizeTrackingCompactBuffer[T: ClassTag] + extends CompactBuffer[T] with SizeTracker { + + override def +=(value: T): SizeTrackingCompactBuffer[T] = { + super.+=(value) + afterUpdate() + this + } + + override def growToSize(newSize: Int): Unit = { + super.growToSize(newSize) + resetSamples() + } +} + +/** + * A data set that is not empty. + * + * @param buffer + * @tparam T + */ +private[spark] class Chunk[T: ClassTag](buffer: CompactBuffer[T]) extends Iterable[T] { + require(buffer.nonEmpty, "Chunk must contain some data") + + override def iterator: Iterator[T] = buffer.iterator +} + +private[spark] class ChunkParameters { + val sparkConf = SparkEnv.get.conf + val serializer = SparkEnv.get.serializer + val blockManager = SparkEnv.get.blockManager + val diskBlockManager = blockManager.diskBlockManager + + val serializerBatchSize = sparkConf.getLong("spark.join.spill.batchSize", 10000) + val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 +} + +/** + * An append-only buffer that spills contents to disk when there is insufficient space for it + * to grow. If there is insufficient space, it will flush the current contents in memory to disk + * and further contents will be written to disk directly. + * + * The buffer will prevent further append operations once someone starts to read the contents. + * + * @param parameters + * @tparam T + */ +class ChunkBuffer[T: ClassTag](parameters: ChunkParameters) + extends Iterable[Chunk[T]] with Spillable[CompactBuffer[T]] { + + private var buffer = new SizeTrackingCompactBuffer[T]() + + private var diskBuffer: DiskChunkBuffer[T] = null + + def diskBytesSpilled: Long = if (diskBuffer == null) 0 else diskBuffer.diskBytesSpilled + + def +=(value: T): ChunkBuffer[T] = { + if (diskBuffer != null) { + diskBuffer += value + } + else { + buffer += value + addElementsRead() + if (maybeSpill(buffer, buffer.estimateSize())) { + // It's important to release the reference to free the memory + buffer = null + } + } + this + } + + def ++=(values: Iterable[T]): ChunkBuffer[T] = { + for (v <- values.iterator) { + this += v + } + this + } + + override def iterator: Iterator[Chunk[T]] = { + if (diskBuffer == null) { + if (buffer.isEmpty) { + Iterator() + } + else { + Iterator(new Chunk(buffer)) + } + } + else { + diskBuffer.iterator + } + } + + override protected def spill(buffer: CompactBuffer[T]): Unit = { + diskBuffer = new DiskChunkBuffer[T](parameters) + buffer.foreach(diskBuffer += _) + } + + /** + * @return the size of chunks, or 0 if **no data**. + */ + override def size: Int = if (diskBuffer == null) { + if (buffer.isEmpty) 0 else 1 + } else { + diskBuffer.size + } +} + +/** + * A buffer to append values to the disk file. + * + * @param parameters + * @tparam T + */ +private[spark] class DiskChunkBuffer[T: ClassTag](parameters: ChunkParameters) + extends Iterable[Chunk[T]] { + + // If DiskChunkBuffer is frozen, it cannot be modified any more. + private var isFrozen = false + + // If DiskChunkBuffer becomes corrupt (such as throw some exception), should prevent any further + // operation + private var corrupt = false + + private val batchSizes = ArrayBuffer[Long]() + + private val (blockId, file) = parameters.diskBlockManager.createTempLocalBlock() + + private var curWriteMetrics = new ShuffleWriteMetrics() + + private val blockManager = parameters.blockManager + private val serializer = parameters.serializer + private val serializerBatchSize = parameters.serializerBatchSize + + private var writer = getWriter() + + private val ser = serializer.newInstance() + + private var _diskBytesSpilled = 0L + + def diskBytesSpilled: Long = _diskBytesSpilled + + private def getWriter() = { + blockManager.getDiskWriter(blockId, file, serializer, parameters.fileBufferSize, + curWriteMetrics) + } + + private var objectsWritten = 0 + + def +=(value: T): DiskChunkBuffer[T] = { + if (corrupt) { + throw new IllegalStateException("DiskChunkBuffer has been corrupt") + } + + try { + if (isFrozen) { + throw new IllegalStateException("DiskChunkBuffer has been frozen") + } + + writer.write(value) + objectsWritten += 1 + + if (objectsWritten == serializerBatchSize) { + flush() + curWriteMetrics = new ShuffleWriteMetrics() + writer = getWriter() + } + this + } catch { + case e: Throwable => + corrupt = true + cleanupOnThrowable() + throw e + } + } + + override def iterator: Iterator[Chunk[T]] = { + if (corrupt) { + throw new IllegalStateException("DiskChunkBuffer has been corrupt") + } + + try { + if (!isFrozen) { + frozen() + } + new DiskChunkIterator(file, blockId, batchSizes, serializerBatchSize, blockManager, ser) + } catch { + case e: Throwable => + corrupt = true + cleanupOnThrowable() + throw e + } + } + + private def frozen(): Unit = { + isFrozen = true + if (objectsWritten > 0) { + flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() + } + } + + private def flush(): Unit = { + val w = writer + writer = null + w.commitAndClose() + _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten + batchSizes.append(curWriteMetrics.shuffleBytesWritten) + objectsWritten = 0 + } + + /** + * Clean up the resources when some exception happens + */ + private def cleanupOnThrowable(): Unit = { + if (writer != null) { + writer.revertPartialWritesAndClose() + writer == null + } + if (file.exists()) { + file.delete() + } + } + + override def size: Int = batchSizes.size +} + +object ChunkBuffer { + def apply[T: ClassTag](parameters: ChunkParameters): ChunkBuffer[T] = { + new ChunkBuffer(parameters) + } +} + +private[spark] class DiskChunkIterator[T: ClassTag](file: File, blockId: BlockId, + batchSizes: ArrayBuffer[Long], serializerBatchSize: Long, blockManager: BlockManager, + ser: SerializerInstance) + extends Iterator[Chunk[T]] { + + private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1 + assert(file.length() == batchOffsets.last, + "File length is not equal to the last batch offset:\n" + + s" file length = ${file.length}\n" + + s" last batch offset = ${batchOffsets.last}\n" + + s" all batch offsets = ${batchOffsets.mkString(",")}" + ) + + private var nextChunk: Chunk[T] = null + + private var batchIndex = 0 // Which batch we're in + + override def hasNext: Boolean = { + if (nextChunk == null) { + nextChunk = moveToNextChunk() + } + nextChunk != null + } + + override def next(): Chunk[T] = { + if (hasNext) { + val chunk = nextChunk + nextChunk = null + chunk + } + else { + throw new NoSuchElementException + } + } + + private def moveToNextChunk(): Chunk[T] = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchIndex < batchOffsets.length - 1) { + val buffer = CompactBuffer[T]() + var objectsRead = 0 + val fileStream = new FileInputStream(file) + var deserializeStream: DeserializationStream = null + try { + val start = batchOffsets(batchIndex) + fileStream.getChannel.position(start) + batchIndex += 1 + + val end = batchOffsets(batchIndex) + + assert(end >= start, "start = " + start + ", end = " + end + + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) + + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + deserializeStream = ser.deserializeStream(compressedStream) + while (objectsRead < serializerBatchSize) { + buffer += deserializeStream.readObject() + objectsRead += 1 + } + new Chunk(buffer) + } finally { + deserializeStream.close() + fileStream.close() + } + } else { + // No more batches left, so delete the file + // TODO how to delete the file if any exception happens when using the iterator + if (file.exists()) { + file.delete() + } + null + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala index d44e15e3c97ea..4be1c7ea474d7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala @@ -123,7 +123,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { } /** Increase our size to newSize and grow the backing array if needed. */ - private def growToSize(newSize: Int): Unit = { + protected def growToSize(newSize: Int): Unit = { if (newSize < 0) { throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements") } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMap.scala new file mode 100644 index 0000000000000..dd89d14f0a6e2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMap.scala @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.serializer.{DeserializationStream, Serializer} +import org.apache.spark.storage.{BlockId, BlockManager} +import org.apache.spark.executor.ShuffleWriteMetrics + +/** + * :: DeveloperApi :: + * An append-only map that spills sorted content to disk when there is insufficient space for it + * to grow. + * + * This map takes two passes over the data: + * + * (1) Values are merged into combiners, which are sorted and spilled to disk as necessary + * (2) Combiners are read from disk and merged together + * + * The setting of the spill threshold faces the following trade-off: If the spill threshold is + * too high, the in-memory map may occupy more memory than is available, resulting in OOM. + * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk + * writes. This may lead to a performance regression compared to the normal case of using the + * non-spilling AppendOnlyMap. + * + * Two parameters control the memory threshold: + * + * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing + * these maps as a fraction of the executor's total memory. Since each concurrently running + * task maintains one map, the actual threshold for each map is this quantity divided by the + * number of running tasks. + * + * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of + * this threshold, in case map size estimation is not sufficiently accurate. + */ +@DeveloperApi +class ExternalOrderingAppendOnlyMap[K, V, C]( + createCombiner: V => C, + mergeValue: (C, V) => C, + createExternalCombiner: () => C, // To avoid keeping 2 `CompactBuffer`s in the memory + mergeCombiners: (C, C) => C, + serializer: Serializer = SparkEnv.get.serializer, + blockManager: BlockManager = SparkEnv.get.blockManager)(implicit ord: Ordering[K]) + extends Iterable[(K, C)] + with Serializable + with Logging + with Spillable[SizeTracker] { + + private var currentMap = new SizeTrackingAppendOnlyMap[K, C] + private val spilledMaps = new ArrayBuffer[DiskMapIterator] + private val sparkConf = SparkEnv.get.conf + private val diskBlockManager = blockManager.diskBlockManager + + /** + * Size of object batches when reading/writing from serializers. + * + * Objects are written in batches, with each batch using its own serialization stream. This + * cuts down on the size of reference-tracking maps constructed when deserializing a stream. + * + * NOTE: Setting this too low can cause excessive copying when serializing, since some serializers + * grow internal data structures by growing + copying every time the number of objects doubles. + */ + private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000) + + // Number of bytes spilled in total + private var _diskBytesSpilled = 0L + + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 + + // Write metrics for current spill + private var curWriteMetrics: ShuffleWriteMetrics = _ + + private val ser = serializer.newInstance() + + /** + * Insert the given key and value into the map. + */ + def insert(key: K, value: V): Unit = { + insertAll(Iterator((key, value))) + } + + /** + * Insert the given iterator of keys and values into the map. + * + * When the underlying map needs to grow, check if the global pool of shuffle memory has + * enough room for this to happen. If so, allocate the memory required to grow the map; + * otherwise, spill the in-memory map to disk. + * + * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. + */ + def insertAll(entries: Iterator[Product2[K, V]]): Unit = { + // An update function for the map that we reuse across entries to avoid allocating + // a new closure each time + var curEntry: Product2[K, V] = null + val update: (Boolean, C) => C = (hadVal, oldVal) => { + if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2) + } + + while (entries.hasNext) { + curEntry = entries.next() + if (maybeSpill(currentMap, currentMap.estimateSize())) { + currentMap = new SizeTrackingAppendOnlyMap[K, C] + } + currentMap.changeValue(curEntry._1, update) + addElementsRead() + } + } + + /** + * Insert the given iterable of keys and values into the map. + * + * When the underlying map needs to grow, check if the global pool of shuffle memory has + * enough room for this to happen. If so, allocate the memory required to grow the map; + * otherwise, spill the in-memory map to disk. + * + * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. + */ + def insertAll(entries: Iterable[Product2[K, V]]): Unit = { + insertAll(entries.iterator) + } + + /** + * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. + */ + override protected[this] def spill(collection: SizeTracker): Unit = { + val (blockId, file) = diskBlockManager.createTempLocalBlock() + curWriteMetrics = new ShuffleWriteMetrics() + var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, + curWriteMetrics) + var objectsWritten = 0 + + // List of batch sizes (bytes) in the order they are written to disk + val batchSizes = new ArrayBuffer[Long] + + // Flush the disk writer's contents to disk, and update relevant variables + def flush() = { + val w = writer + writer = null + w.commitAndClose() + _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten + batchSizes.append(curWriteMetrics.shuffleBytesWritten) + objectsWritten = 0 + } + + var success = false + try { + val it = currentMap.destructiveSortedIterator(ord) + while (it.hasNext) { + val kv = it.next() + writer.write(kv._1) + writer.write(kv._2) + objectsWritten += 1 + + if (objectsWritten == serializerBatchSize) { + flush() + curWriteMetrics = new ShuffleWriteMetrics() + writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, + curWriteMetrics) + } + } + if (objectsWritten > 0) { + flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() + } + success = true + } finally { + if (!success) { + // This code path only happens if an exception was thrown above before we set success; + // close our stuff and let the exception be thrown further + if (writer != null) { + writer.revertPartialWritesAndClose() + } + if (file.exists()) { + file.delete() + } + } + } + + spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) + } + + def diskBytesSpilled: Long = _diskBytesSpilled + + /** + * Return an iterator that merges the in-memory map with the spilled maps. + * If no spill has occurred, simply return the in-memory map's iterator. + */ + override def iterator: Iterator[(K, C)] = { + if (spilledMaps.isEmpty) { + currentMap.iterator + } else { + new ExternalIterator() + } + } + + /** + * An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps + */ + private class ExternalIterator extends Iterator[(K, C)] { + + // A queue that maintains a buffer for each stream we are currently merging + // This queue maintains the invariant that it only contains non-empty buffers + private val mergeHeap = new mutable.PriorityQueue[StreamBuffer] + + // Input streams are derived both from the in-memory map and spilled maps on disk + // The in-memory map is sorted in place, while the spilled maps are already in sorted order + private val sortedMap = currentMap.destructiveSortedIterator(ord). + map(p => (p._1, new MemoryLazyValue(p._2))) + private val inputStreams = (Seq(sortedMap) ++ spilledMaps) + + inputStreams.foreach { it => + val buffer = new StreamBuffer(it) + if (buffer.moveToNext()) { + mergeHeap.enqueue(buffer) + } + } + + /** + * Return true if there exists an input stream that still has unvisited pairs. + */ + override def hasNext: Boolean = mergeHeap.length > 0 + + /** + * Select a key with the minimum hash, then combine all values with the same key from all + * input streams. + */ + override def next(): (K, C) = { + if (mergeHeap.length == 0) { + throw new NoSuchElementException + } + var minCombiner = createExternalCombiner() + + // Select a key from the StreamBuffer that holds the lowest key hash + val minBuffer = mergeHeap.dequeue() + val minKey = minBuffer.currentKey + minCombiner = mergeCombiners(minCombiner, minBuffer.currentValue) + + // Release the memory used by `minBuffer.currentValue` + if (minBuffer.moveToNext()) { + mergeHeap.enqueue(minBuffer) + } + + while (mergeHeap.length > 0 && mergeHeap.head.currentKey == minKey) { + val newBuffer = mergeHeap.dequeue() + minCombiner = mergeCombiners(minCombiner, newBuffer.currentValue) + if (newBuffer.moveToNext()) { + mergeHeap.enqueue(newBuffer) + } + } + + (minKey, minCombiner) + } + + /** + * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash. + * Each buffer maintains all of the key-value pairs with what is currently the lowest hash + * code among keys in the stream. There may be multiple keys if there are hash collisions. + * Note that because when we spill data out, we only spill one value for each key, there is + * at most one element for each key. + * + * StreamBuffers are ordered by the minimum key hash currently available in their stream so + * that we can put them into a heap and sort that. + */ + private class StreamBuffer( + val iterator: Iterator[(K, LazyValue[C])]) + extends Comparable[StreamBuffer] { + + private var pair: (K, LazyValue[C]) = null + + def currentKey = pair._1 + + def currentValue = pair._2.value + + override def compareTo(other: StreamBuffer): Int = { + // descending order because mutable.PriorityQueue dequeues the max, not the min + ord.compare(other.pair._1, pair._1) + } + + def moveToNext(): Boolean = { + if (iterator.hasNext) { + pair = iterator.next() + true + } + else { + // Note: It's important to release the reference to free the memory + pair = null + false + } + } + } + } + + /** + * An iterator that returns (K, LazyValue[C]) pairs in sorted order from an on-disk map. + */ + private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) + extends Iterator[(K, LazyValue[C])] + { + private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1 + assert(file.length() == batchOffsets.last, + "File length is not equal to the last batch offset:\n" + + s" file length = ${file.length}\n" + + s" last batch offset = ${batchOffsets.last}\n" + + s" all batch offsets = ${batchOffsets.mkString(",")}" + ) + + private var batchIndex = 0 // Which batch we're in + private var fileStream: FileInputStream = null + + // An intermediate stream that reads from exactly one batch + // This guards against pre-fetching and other arbitrary behavior of higher level streams + private var deserializeStream = nextBatchStream() + private var nextItem: (K, StreamLazyValue[C]) = null + private var objectsRead = 0 + + /** + * Construct a stream that reads only from the next batch. + */ + private def nextBatchStream(): DeserializationStream = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchIndex < batchOffsets.length - 1) { + if (deserializeStream != null) { + deserializeStream.close() + fileStream.close() + deserializeStream = null + fileStream = null + } + + val start = batchOffsets(batchIndex) + fileStream = new FileInputStream(file) + fileStream.getChannel.position(start) + batchIndex += 1 + + val end = batchOffsets(batchIndex) + + assert(end >= start, "start = " + start + ", end = " + end + + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) + + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + ser.deserializeStream(compressedStream) + } else { + // No more batches left + cleanup() + null + } + } + + /** + * Return the next (K, C) pair from the deserialization stream. + * + * If the current batch is drained, construct a stream for the next batch and read from it. + * If no more pairs are left, return null. + */ + private def readNextItem(): (K, StreamLazyValue[C]) = { + try { + if (objectsRead == serializerBatchSize) { + objectsRead = 0 + deserializeStream = nextBatchStream() + } + if (deserializeStream == null) { + return null + } + val item = (deserializeStream.readObject().asInstanceOf[K], + new StreamLazyValue[C](deserializeStream)) + objectsRead += 1 + item + } catch { + case e: EOFException => + cleanup() + null + } + } + + override def hasNext: Boolean = { + if (nextItem == null) { + nextItem = readNextItem() + } + nextItem != null + } + + override def next(): (K, LazyValue[C]) = { + if (hasNext) { + val item = nextItem + nextItem = null + item + } + else { + throw new NoSuchElementException + } + } + + // TODO: Ensure this gets called even if the iterator isn't drained. + private def cleanup() { + batchIndex = batchOffsets.length // Prevent reading any other batch + val ds = deserializeStream + deserializeStream = null + fileStream = null + ds.close() + file.delete() + } + } + +} + +private[spark] trait LazyValue[T]{ + def value: T +} + +/** + * A wrapper to a value memory. It's used to make the interface consistent. + * + * @param value + * @tparam T + */ +private[spark] class MemoryLazyValue[T](override val value: T) extends LazyValue[T] + +/** + * A wrapper to read the value lazily. This is used to prefetch a key without reading the value into + * memory. + * + * Note: `stream` must not be read in other place until `value` is materialized. + * + * @param stream + * @tparam T + */ +private[spark] class StreamLazyValue[T](stream: DeserializationStream) extends LazyValue[T] { + override lazy val value: T = stream.readObject().asInstanceOf[T] +} diff --git a/core/src/test/scala/org/apache/spark/rdd/SkewedJoinSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SkewedJoinSuite.scala new file mode 100644 index 0000000000000..b93ddeb5282da --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/SkewedJoinSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import org.scalatest.FunSuite + +import org.apache.spark.SharedSparkContext + +class SkewedJoinRDDSuite extends FunSuite with SharedSparkContext { + + test("skewedJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.skewedJoin(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("skewedJoin all-to-all") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val joined = rdd1.skewedJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("skewedLeftOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.skewedLeftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("skewedRightOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.skewedRightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("skewedFullOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.skewedFullOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (Some(1), Some('x'))), + (1, (Some(2), Some('x'))), + (2, (Some(1), Some('y'))), + (2, (Some(1), Some('z'))), + (3, (Some(1), None)), + (4, (None, Some('w'))) + )) + } + + test("skewedJoin with no matches") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.skewedJoin(rdd2).collect() + assert(joined.size === 0) + } + + test("skewedJoin with many output partitions") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.skewedJoin(rdd2, 10).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + +} + +class SkewedJoinRDDWithoutShuffleSuite extends SkewedJoinRDDSuite { + + override def beforeAll() { + conf.set("spark.shuffle.spill", "false") + super.beforeAll() + } + +} + diff --git a/core/src/test/scala/org/apache/spark/util/collection/ChunkBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ChunkBufferSuite.scala new file mode 100644 index 0000000000000..13faf9b74cb5e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/ChunkBufferSuite.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import org.apache.spark.{SparkContext, SparkConf, LocalSparkContext} +import org.apache.spark.io.CompressionCodec +import org.scalatest.FunSuite + +class ChunkBufferSuite extends FunSuite with LocalSparkContext { + private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS + + private def createSparkConf(loadDefaults: Boolean, codec: Option[String] = None): SparkConf = { + val conf = new SparkConf(loadDefaults) + + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf.set("spark.shuffle.spill.compress", codec.isDefined.toString) + conf.set("spark.shuffle.compress", codec.isDefined.toString) + codec.foreach { c => conf.set("spark.io.compression.codec", c) } + // Ensure that we actually have multiple batches per spill file + conf.set("spark.join.spill.batchSize", "10") + conf + } + + test("DiskChunkBuffer: append and read") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + val parameters = new ChunkParameters + val buffer = new DiskChunkBuffer[Int](parameters) + + (0 until 1000).foreach(buffer += _) + + assert(100 === buffer.size) + assert((0 until 1000) === buffer.iterator.flatMap(chunk => chunk).toList) + + sc.stop() + } + + test("DiskChunkBuffer: append to a frozen DiskChunkBuffer") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + + val parameters = new ChunkParameters + val buffer = new DiskChunkBuffer[Int](parameters) + + (0 until 1000).foreach(buffer += _) + + val iter = buffer.iterator + + val e = intercept[IllegalStateException] { + buffer += 1 + } + assert("DiskChunkBuffer has been frozen" === e.getMessage) + + sc.stop() + } + + test("ChunkBuffer: in-memory append and read") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + + val parameters = new ChunkParameters + val buffer = new ChunkBuffer[Int](parameters) + + (0 until 1000).foreach(buffer += _) + + assert(1 === buffer.size) + assert((0 until 1000) === buffer.iterator.flatMap(chunk => chunk).toList) + + sc.stop() + } + + test("ChunkBuffer: disk append and read") { + val conf = createSparkConf(loadDefaults = false) + conf.set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + + val parameters = new ChunkParameters + val buffer = new ChunkBuffer[Long](parameters) + + (0L until 1000000L).foreach(buffer += _) // make sure it will use at least 5MB + + assert(100000 === buffer.size) + assert((0L until 1000000L) === buffer.iterator.flatMap(chunk => chunk).toList) + + sc.stop() + } + + test("ChunkBuffer: empty") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + + val parameters = new ChunkParameters + val buffer = new ChunkBuffer[Int](parameters) + + assert(0 === buffer.size) + assert(Nil === buffer.iterator.flatMap(chunk => chunk).toList) + + sc.stop() + } +} diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMapSuite.scala new file mode 100644 index 0000000000000..4e34154144466 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalOrderingAppendOnlyMapSuite.scala @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.io.CompressionCodec + +class ExternalOrderingAppendOnlyMapSuite extends FunSuite with LocalSparkContext { + private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS + private def createCombiner[T](i: T) = ArrayBuffer[T](i) + private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i + private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): ArrayBuffer[T] = + buf1 ++= buf2 + + private def createExternalMap[T: Ordering] = new ExternalOrderingAppendOnlyMap[T, T, ArrayBuffer[T]]( + createCombiner[T], mergeValue[T], () => ArrayBuffer[T](), mergeCombiners[T]) + + private def createSparkConf(loadDefaults: Boolean, codec: Option[String] = None): SparkConf = { + val conf = new SparkConf(loadDefaults) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf.set("spark.shuffle.spill.compress", codec.isDefined.toString) + conf.set("spark.shuffle.compress", codec.isDefined.toString) + codec.foreach { c => conf.set("spark.io.compression.codec", c) } + // Ensure that we actually have multiple batches per spill file + conf.set("spark.shuffle.spill.batchSize", "10") + conf + } + + test("simple insert") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + val map = createExternalMap[Int] + + // Single insert + map.insert(1, 10) + var it = map.iterator + assert(it.hasNext) + val kv = it.next() + assert(kv._1 === 1 && kv._2 === ArrayBuffer[Int](10)) + assert(!it.hasNext) + + // Multiple insert + map.insert(2, 20) + map.insert(3, 30) + it = map.iterator + assert(it.hasNext) + assert(it.toSet === Set[(Int, ArrayBuffer[Int])]( + (1, ArrayBuffer[Int](10)), + (2, ArrayBuffer[Int](20)), + (3, ArrayBuffer[Int](30)))) + sc.stop() + } + + test("insert with collision") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + val map = createExternalMap[Int] + + map.insertAll(Seq( + (1, 10), + (2, 20), + (3, 30), + (1, 100), + (2, 200), + (1, 1000))) + val it = map.iterator + assert(it.hasNext) + val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) + assert(result === Set[(Int, Set[Int])]( + (1, Set[Int](10, 100, 1000)), + (2, Set[Int](20, 200)), + (3, Set[Int](30)))) + sc.stop() + } + + test("ordering") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + + val map1 = createExternalMap[Int] + map1.insert(1, 10) + map1.insert(2, 20) + map1.insert(3, 30) + + val map2 = createExternalMap[Int] + map2.insert(2, 20) + map2.insert(3, 30) + map2.insert(1, 10) + + val map3 = createExternalMap[Int] + map3.insert(3, 30) + map3.insert(1, 10) + map3.insert(2, 20) + + val it1 = map1.iterator + val it2 = map2.iterator + val it3 = map3.iterator + + var kv1 = it1.next() + var kv2 = it2.next() + var kv3 = it3.next() + assert(kv1._1 === kv2._1 && kv2._1 === kv3._1) + assert(kv1._2 === kv2._2 && kv2._2 === kv3._2) + + kv1 = it1.next() + kv2 = it2.next() + kv3 = it3.next() + assert(kv1._1 === kv2._1 && kv2._1 === kv3._1) + assert(kv1._2 === kv2._2 && kv2._2 === kv3._2) + + kv1 = it1.next() + kv2 = it2.next() + kv3 = it3.next() + assert(kv1._1 === kv2._1 && kv2._1 === kv3._1) + assert(kv1._2 === kv2._2 && kv2._2 === kv3._2) + sc.stop() + } + + test("null keys and values") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + + val map = createExternalMap[Int] + map.insert(1, 5) + map.insert(2, 6) + map.insert(3, 7) + assert(map.size === 3) + assert(map.iterator.toSet === Set[(Int, Seq[Int])]( + (1, Seq[Int](5)), + (2, Seq[Int](6)), + (3, Seq[Int](7)) + )) + + // Null keys + val nullInt = null.asInstanceOf[Int] + map.insert(nullInt, 8) + assert(map.size === 4) + assert(map.iterator.toSet === Set[(Int, Seq[Int])]( + (1, Seq[Int](5)), + (2, Seq[Int](6)), + (3, Seq[Int](7)), + (nullInt, Seq[Int](8)) + )) + + // Null values + map.insert(4, nullInt) + map.insert(nullInt, nullInt) + assert(map.size === 5) + val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) + assert(result === Set[(Int, Set[Int])]( + (1, Set[Int](5)), + (2, Set[Int](6)), + (3, Set[Int](7)), + (4, Set[Int](nullInt)), + (nullInt, Set[Int](nullInt, 8)) + )) + sc.stop() + } + + test("simple aggregator") { + val conf = createSparkConf(loadDefaults = false) + sc = new SparkContext("local", "test", conf) + + // reduceByKey + val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1)) + val result1 = rdd.reduceByKey(_+_).collect() + assert(result1.toSet === Set[(Int, Int)]((0, 5), (1, 5))) + + // groupByKey + val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet + assert(result2.toSet === Set[(Int, Seq[Int])] + ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1)))) + sc.stop() + } + + test("spilling with hash collisions") { + val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + val map = createExternalMap[String] + + val collisionPairs = Seq( + ("Aa", "BB"), // 2112 + ("to", "v1"), // 3707 + ("variants", "gelato"), // -1249574770 + ("Teheran", "Siblings"), // 231609873 + ("misused", "horsemints"), // 1069518484 + ("isohel", "epistolaries"), // -1179291542 + ("righto", "buzzards"), // -931102253 + ("hierarch", "crinolines"), // -1732884796 + ("inwork", "hypercatalexes"), // -1183663690 + ("wainages", "presentencing"), // 240183619 + ("trichothecenes", "locular"), // 339006536 + ("pomatoes", "eructation") // 568647356 + ) + + collisionPairs.foreach { case (w1, w2) => + // String.hashCode is documented to use a specific algorithm, but check just in case + assert(w1.hashCode === w2.hashCode) + } + + map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i))) + collisionPairs.foreach { case (w1, w2) => + map.insert(w1, w2) + map.insert(w2, w1) + } + + // A map of collision pairs in both directions + val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap + + // Avoid map.size or map.iterator.length because this destructively sorts the underlying map + var count = 0 + + val it = map.iterator + while (it.hasNext) { + val kv = it.next() + val expectedValue = ArrayBuffer[String](collisionPairsMap.getOrElse(kv._1, kv._1)) + assert(kv._2.equals(expectedValue)) + count += 1 + } + assert(count === 100000 + collisionPairs.size * 2) + sc.stop() + } + + test("spilling with many hash collisions") { + val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.memoryFraction", "0.0001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) + + // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes + // problems if the map fails to group together the objects with the same code (SPARK-2043). + for (i <- 1 to 10) { + for (j <- 1 to 10000) { + map.insert(FixedHashObject(j, j % 2), 1) + } + } + + val it = map.iterator + var count = 0 + while (it.hasNext) { + val kv = it.next() + assert(kv._2 === 10) + count += 1 + } + assert(count === 10000) + sc.stop() + } + + test("spilling with hash collisions using the Int.MaxValue key") { + val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + val map = createExternalMap[Int] + + (1 to 100000).foreach { i => map.insert(i, i) } + map.insert(Int.MaxValue, Int.MaxValue) + + val it = map.iterator + while (it.hasNext) { + // Should not throw NoSuchElementException + it.next() + } + sc.stop() + } + + test("spilling with null keys and values") { + val conf = createSparkConf(loadDefaults = true) + conf.set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + val map = createExternalMap[Int] + + map.insertAll((1 to 100000).iterator.map(i => (i, i))) + map.insert(null.asInstanceOf[Int], 1) + map.insert(1, null.asInstanceOf[Int]) + map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int]) + + val it = map.iterator + while (it.hasNext) { + // Should not throw NullPointerException + it.next() + } + sc.stop() + } + +} From af7eb714ab9916a628859682b8cbf9c4c2396029 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 11 Dec 2014 17:29:06 +0800 Subject: [PATCH 2/2] Remove the configuration about turning on skewed join in 'join' methods --- .../apache/spark/rdd/PairRDDFunctions.scala | 50 +++++++------------ 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 83fb5190e3e32..a364fbad21ef8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -481,13 +481,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { - if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { - skewedJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) - } else { - this.cogroup(other, partitioner).flatMapValues { pair => - for (v <- pair._1; w <- pair._2) yield (v, w) - } - } + this.cogroup(other, partitioner).flatMapValues( pair => + for (v <- pair._1; w <- pair._2) yield (v, w) + ) } /** @@ -687,15 +683,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * partition the output RDD. */ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { - if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { - skewedLeftOuterJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) - } else { - this.cogroup(other, partitioner).flatMapValues { pair => - if (pair._2.isEmpty) { - pair._1.map(v => (v, None)) - } else { - for (v <- pair._1; w <- pair._2) yield (v, Some(w)) - } + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._2.isEmpty) { + pair._1.map(v => (v, None)) + } else { + for (v <- pair._1; w <- pair._2) yield (v, Some(w)) } } } @@ -708,15 +700,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { - if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { - skewedRightOuterJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) - } else { - this.cogroup(other, partitioner).flatMapValues { pair => - if (pair._1.isEmpty) { - pair._2.map(w => (None, w)) - } else { - for (v <- pair._1; w <- pair._2) yield (Some(v), w) - } + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._1.isEmpty) { + pair._2.map(w => (None, w)) + } else { + for (v <- pair._1; w <- pair._2) yield (Some(v), w) } } } @@ -731,14 +719,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = { - if(SparkEnv.get.conf.getBoolean("spark.join.skewed.enabled", false)) { - skewedFullOuterJoin(other, partitioner)(ClassTag.AnyRef.asInstanceOf[ClassTag[W]]) - } else { - this.cogroup(other, partitioner).flatMapValues { - case (vs, Seq()) => vs.map(v => (Some(v), None)) - case (Seq(), ws) => ws.map(w => (None, Some(w))) - case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) - } + this.cogroup(other, partitioner).flatMapValues { + case (vs, Seq()) => vs.map(v => (Some(v), None)) + case (Seq(), ws) => ws.map(w => (None, Some(w))) + case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) } }