From d0024efb43501f2d0a02c74b7beb1dcc0970c834 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 10:14:47 -0400 Subject: [PATCH 01/14] [SPARK-8464] As a starting point, refactored ExternalSorter into two class instances, one which uses Aggregator and one which does not. Next step is to extract out common code --- .../util/collection/ExternalSorterAgg.scala | 704 ++++++++++++++++++ .../util/collection/ExternalSorterNoAgg.scala | 703 +++++++++++++++++ 2 files changed, 1407 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala create mode 100644 core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala new file mode 100644 index 0000000000000..6948f58fe7ab5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala @@ -0,0 +1,704 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.annotations.VisibleForTesting +import com.google.common.io.ByteStreams + +import org.apache.spark._ +import org.apache.spark.serializer._ +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter} +import org.apache.spark.storage.{BlockId, BlockObjectWriter} + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied + * to its use in sort-based shuffle (for example, its block compression is controlled by + * `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other + * non-shuffle contexts where we might want to use different configuration settings. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * Users interact with this class in the following way: + * + * 1. Instantiate an ExternalSorter. + * + * 2. Call insertAll() with a set of records. + * + * 3. Request an iterator() back to traverse sorted/aggregated records. + * - or - + * Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs + * that can be used in Spark's sort shuffle. + * + * At a high level, this class works internally as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a PartitionedAppendOnlyMap if + * we want to combine by key, or a PartitionedSerializedPairBuffer or PartitionedPairBuffer if we + * don't. Inside these buffers, we sort elements by partition ID and then possibly also by key. + * To avoid calling the partitioner multiple times with each key, we store the partition ID + * alongside each record. + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator or file output, the spilled files are merged, along with + * any remaining in-memory data, using the same sort order defined above (unless both sorting + * and aggregation are disabled). If we need to aggregate by key, we either use a total ordering + * from the ordering parameter, or read the keys with the same hash code and compare them with + * each other for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorterAgg[K, V, C]( + aggregator: Aggregator[K, V, C], + partitioner: Option[Partitioner] = None, + ordering: Option[Ordering[K]] = None, + serializer: Option[Serializer] = None) + extends Logging + with Spillable[WritablePartitionedPairCollection[K, C]] + with SortShuffleFileWriter[K, V] { + + private val conf = SparkEnv.get.conf + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions > 1 + private def getPartition(key: K): Int = { + if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Since SPARK-7855, bypassMergeSort optimization is no longer performed as part of this class. + // As a sanity check, make sure that we're not handling a shuffle which should use that path. + if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, Some(aggregator), ordering)) { + throw new IllegalArgumentException("ExternalSorter should not be used to handle " + + " a sort that the BypassMergeSortShuffleWriter should handle") + } + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) + + // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided + private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 + + // Size of object batches when reading/writing from serializers. + // + // Objects are written in batches, with each batch using its own serialization stream. This + // cuts down on the size of reference-tracking maps constructed when deserializing a stream. + // + // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers + // grow internal data structures by growing + copying every time the number of objects doubles. + private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) + + private val useSerializedPairBuffer = + ordering.isEmpty && + conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && + ser.supportsRelocationOfSerializedObjects + private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB + private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { + if (useSerializedPairBuffer) { + new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) + } else { + new PartitionedPairBuffer[K, C] + } + } + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + private var map = new PartitionedAppendOnlyMap[K, C] + private var buffer = newBuffer() + + // Total spilling statistics + private var _diskBytesSpilled = 0L + def diskBytesSpilled: Long = _diskBytesSpilled + + + // A comparator for keys K that orders them within a partition to allow aggregation or sorting. + // Can be a partial ordering by hash code if a total ordering is not provided through by the + // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some + // non-equal keys also have this, so we need to do a later pass to find truly equal keys). + // Note that we ignore this if no aggregator and no ordering are given. + private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { + override def compare(a: K, b: K): Int = { + val h1 = if (a == null) 0 else a.hashCode() + val h2 = if (b == null) 0 else b.hashCode() + if (h1 < h2) -1 else if (h1 == h2) 0 else 1 + } + }) + + private def comparator: Option[Comparator[K]] = { + Some(keyComparator) + } + + // Information about a spilled file. Includes sizes in bytes of "batches" written by the + // serializer as we periodically reset its stream, as well as number of elements in each + // partition, used to efficiently keep track of partitions when merging. + private[this] case class SpilledFile( + file: File, + blockId: BlockId, + serializerBatchSizes: Array[Long], + elementsPerPartition: Array[Long]) + + private val spills = new ArrayBuffer[SpilledFile] + + override def insertAll(records: Iterator[Product2[K, V]]): Unit = { + // TODO: stop combining if we find that the reduction factor isn't high + // Combine values in-memory first using our AppendOnlyMap + val mergeValue = aggregator.mergeValue + val createCombiner = aggregator.createCombiner + var kv: Product2[K, V] = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) + } + while (records.hasNext) { + addElementsRead() + kv = records.next() + map.changeValue((getPartition(kv._1), kv._1), update) + maybeSpillCollection(usingMap = true) + } + } + + /** + * Spill the current in-memory collection to disk if needed. + * + * @param usingMap whether we're using a map or buffer as our current in-memory collection + */ + private def maybeSpillCollection(usingMap: Boolean): Unit = { + if (!spillingEnabled) { + return + } + + if (usingMap) { + if (maybeSpill(map, map.estimateSize())) { + map = new PartitionedAppendOnlyMap[K, C] + } + } else { + if (maybeSpill(buffer, buffer.estimateSize())) { + buffer = newBuffer() + } + } + } + + /** + * Spill our in-memory collection to a sorted file that we can merge later. + * We add this file into `spilledFiles` to find it later. + * + * @param collection whichever collection we're using (map or buffer) + */ + override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() + + // These variables are reset after each flush + var objectsWritten: Long = 0 + var spillMetrics: ShuffleWriteMetrics = null + var writer: BlockObjectWriter = null + def openWriter(): Unit = { + assert (writer == null && spillMetrics == null) + spillMetrics = new ShuffleWriteMetrics + writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) + } + openWriter() + + // List of batch sizes (bytes) in the order they are written to disk + val batchSizes = new ArrayBuffer[Long] + + // How many elements we have in each partition + val elementsPerPartition = new Array[Long](numPartitions) + + // Flush the disk writer's contents to disk, and update relevant variables. + // The writer is closed at the end of this process, and cannot be reused. + def flush(): Unit = { + val w = writer + writer = null + w.commitAndClose() + _diskBytesSpilled += spillMetrics.shuffleBytesWritten + batchSizes.append(spillMetrics.shuffleBytesWritten) + spillMetrics = null + objectsWritten = 0 + } + + var success = false + try { + val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + while (it.hasNext) { + val partitionId = it.nextPartition() + it.writeNext(writer) + elementsPerPartition(partitionId) += 1 + objectsWritten += 1 + + if (objectsWritten == serializerBatchSize) { + flush() + openWriter() + } + } + if (objectsWritten > 0) { + flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() + } + success = true + } finally { + if (!success) { + // This code path only happens if an exception was thrown above before we set success; + // close our stuff and let the exception be thrown further + if (writer != null) { + writer.revertPartialWritesAndClose() + } + if (file.exists()) { + file.delete() + } + } + } + + spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) + } + + /** + * Merge a sequence of sorted files, giving an iterator over partitions and then over elements + * inside each partition. This can be used to either write out a new file or return data to + * the user. + * + * Returns an iterator over all the data written to this object, grouped by partition. For each + * partition we then have an iterator over its contents, and these are expected to be accessed + * in order (you can't "skip ahead" to one partition without reading the previous one). + * Guaranteed to return a key-value pair for each partition, in order of partition ID. + */ + private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) + : Iterator[(Int, Iterator[Product2[K, C]])] = { + val readers = spills.map(new SpillReader(_)) + val inMemBuffered = inMemory.buffered + (0 until numPartitions).iterator.map { p => + val inMemIterator = new IteratorForPartition(p, inMemBuffered) + val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) + // Perform partial aggregation across partitions + (p, mergeWithAggregation( + iterators, aggregator.mergeCombiners, keyComparator, ordering.isDefined)) + } + } + + /** + * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. + */ + private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) + : Iterator[Product2[K, C]] = + { + val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) + type Iter = BufferedIterator[Product2[K, C]] + val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { + // Use the reverse of comparator.compare because PriorityQueue dequeues the max + override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) + }) + heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true + new Iterator[Product2[K, C]] { + override def hasNext: Boolean = !heap.isEmpty + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val firstBuf = heap.dequeue() + val firstPair = firstBuf.next() + if (firstBuf.hasNext) { + heap.enqueue(firstBuf) + } + firstPair + } + } + } + + /** + * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each + * iterator is sorted by key with a given comparator. If the comparator is not a total ordering + * (e.g. when we sort objects by hash code and different keys may compare as equal although + * they're not), we still merge them by doing equality tests for all keys that compare as equal. + */ + private def mergeWithAggregation( + iterators: Seq[Iterator[Product2[K, C]]], + mergeCombiners: (C, C) => C, + comparator: Comparator[K], + totalOrder: Boolean) + : Iterator[Product2[K, C]] = + { + if (!totalOrder) { + // We only have a partial ordering, e.g. comparing the keys by hash code, which means that + // multiple distinct keys might be treated as equal by the ordering. To deal with this, we + // need to read all keys considered equal by the ordering at once and compare them. + new Iterator[Iterator[Product2[K, C]]] { + val sorted = mergeSort(iterators, comparator).buffered + + // Buffers reused across elements to decrease memory allocation + val keys = new ArrayBuffer[K] + val combiners = new ArrayBuffer[C] + + override def hasNext: Boolean = sorted.hasNext + + override def next(): Iterator[Product2[K, C]] = { + if (!hasNext) { + throw new NoSuchElementException + } + keys.clear() + combiners.clear() + val firstPair = sorted.next() + keys += firstPair._1 + combiners += firstPair._2 + val key = firstPair._1 + while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) { + val pair = sorted.next() + var i = 0 + var foundKey = false + while (i < keys.size && !foundKey) { + if (keys(i) == pair._1) { + combiners(i) = mergeCombiners(combiners(i), pair._2) + foundKey = true + } + i += 1 + } + if (!foundKey) { + keys += pair._1 + combiners += pair._2 + } + } + + // Note that we return an iterator of elements since we could've had many keys marked + // equal by the partial order; we flatten this below to get a flat iterator of (K, C). + keys.iterator.zip(combiners.iterator) + } + }.flatMap(i => i) + } else { + // We have a total ordering, so the objects with the same key are sequential. + new Iterator[Product2[K, C]] { + val sorted = mergeSort(iterators, comparator).buffered + + override def hasNext: Boolean = sorted.hasNext + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val elem = sorted.next() + val k = elem._1 + var c = elem._2 + while (sorted.hasNext && sorted.head._1 == k) { + val pair = sorted.next() + c = mergeCombiners(c, pair._2) + } + (k, c) + } + } + } + } + + /** + * An internal class for reading a spilled file partition by partition. Expects all the + * partitions to be requested in order. + */ + private[this] class SpillReader(spill: SpilledFile) { + // Serializer batch offsets; size will be batchSize.length + 1 + val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _) + + // Track which partition and which batch stream we're in. These will be the indices of + // the next element we will read. We'll also store the last partition read so that + // readNextPartition() can figure out what partition that was from. + var partitionId = 0 + var indexInPartition = 0L + var batchId = 0 + var indexInBatch = 0 + var lastPartitionId = 0 + + skipToNextPartition() + + // Intermediate file and deserializer streams that read from exactly one batch + // This guards against pre-fetching and other arbitrary behavior of higher level streams + var fileStream: FileInputStream = null + var deserializeStream = nextBatchStream() // Also sets fileStream + + var nextItem: (K, C) = null + var finished = false + + /** Construct a stream that only reads from the next batch */ + def nextBatchStream(): DeserializationStream = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchId < batchOffsets.length - 1) { + if (deserializeStream != null) { + deserializeStream.close() + fileStream.close() + deserializeStream = null + fileStream = null + } + + val start = batchOffsets(batchId) + fileStream = new FileInputStream(spill.file) + fileStream.getChannel.position(start) + batchId += 1 + + val end = batchOffsets(batchId) + + assert(end >= start, "start = " + start + ", end = " + end + + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) + + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream) + serInstance.deserializeStream(compressedStream) + } else { + // No more batches left + cleanup() + null + } + } + + /** + * Update partitionId if we have reached the end of our current partition, possibly skipping + * empty partitions on the way. + */ + private def skipToNextPartition() { + while (partitionId < numPartitions && + indexInPartition == spill.elementsPerPartition(partitionId)) { + partitionId += 1 + indexInPartition = 0L + } + } + + /** + * Return the next (K, C) pair from the deserialization stream and update partitionId, + * indexInPartition, indexInBatch and such to match its location. + * + * If the current batch is drained, construct a stream for the next batch and read from it. + * If no more pairs are left, return null. + */ + private def readNextItem(): (K, C) = { + if (finished || deserializeStream == null) { + return null + } + val k = deserializeStream.readKey().asInstanceOf[K] + val c = deserializeStream.readValue().asInstanceOf[C] + lastPartitionId = partitionId + // Start reading the next batch if we're done with this one + indexInBatch += 1 + if (indexInBatch == serializerBatchSize) { + indexInBatch = 0 + deserializeStream = nextBatchStream() + } + // Update the partition location of the element we're reading + indexInPartition += 1 + skipToNextPartition() + // If we've finished reading the last partition, remember that we're done + if (partitionId == numPartitions) { + finished = true + if (deserializeStream != null) { + deserializeStream.close() + } + } + (k, c) + } + + var nextPartitionToRead = 0 + + def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] { + val myPartition = nextPartitionToRead + nextPartitionToRead += 1 + + override def hasNext: Boolean = { + if (nextItem == null) { + nextItem = readNextItem() + if (nextItem == null) { + return false + } + } + assert(lastPartitionId >= myPartition) + // Check that we're still in the right partition; note that readNextItem will have returned + // null at EOF above so we would've returned false there + lastPartitionId == myPartition + } + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val item = nextItem + nextItem = null + item + } + } + + // Clean up our open streams and put us in a state where we can't read any more data + def cleanup() { + batchId = batchOffsets.length // Prevent reading any other batch + val ds = deserializeStream + deserializeStream = null + fileStream = null + ds.close() + // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop(). + // This should also be fixed in ExternalAppendOnlyMap. + } + } + + /** + * Return an iterator over all the data written to this object, grouped by partition and + * aggregated by the requested aggregator. For each partition we then have an iterator over its + * contents, and these are expected to be accessed in order (you can't "skip ahead" to one + * partition without reading the previous one). Guaranteed to return a key-value pair for each + * partition, in order of partition ID. + * + * For now, we just merge all the spilled files in once pass, but this can be modified to + * support hierarchical merging. + */ + @VisibleForTesting + def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { + val collection: WritablePartitionedPairCollection[K, C] = map + if (spills.isEmpty) { + // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps + // we don't even need to sort by anything other than partition ID + if (!ordering.isDefined) { + // The user hasn't requested sorted keys, so only sort by partition ID, not key + groupByPartition(collection.partitionedDestructiveSortedIterator(None)) + } else { + // We do need to sort by both partition ID and key + groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator))) + } + } else { + // Merge spilled and in-memory data + merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) + } + } + + /** + * Return an iterator over all the data written to this object, aggregated by our aggregator. + */ + def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) + + /** + * Write all the data added into this ExternalSorter into a file in the disk store. This is + * called by the SortShuffleWriter. + * + * @param blockId block ID to write to. The index file will be blockId.name + ".index". + * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. + * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) + */ + override def writePartitionedFile( + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] = { + + // Track location of each range in the output file + val lengths = new Array[Long](numPartitions) + + if (spills.isEmpty) { + // Case where we only have in-memory data + val collection = map + val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + while (it.hasNext) { + val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, + context.taskMetrics.shuffleWriteMetrics.get) + val partitionId = it.nextPartition() + while (it.hasNext && it.nextPartition() == partitionId) { + it.writeNext(writer) + } + writer.commitAndClose() + val segment = writer.fileSegment() + lengths(partitionId) = segment.length + } + } else { + // We must perform merge-sort; get an iterator by partition and write everything directly. + for ((id, elements) <- this.partitionedIterator) { + if (elements.hasNext) { + val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, + context.taskMetrics.shuffleWriteMetrics.get) + for (elem <- elements) { + writer.write(elem._1, elem._2) + } + writer.commitAndClose() + val segment = writer.fileSegment() + lengths(id) = segment.length + } + } + } + + context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) + + lengths + } + + def stop(): Unit = { + spills.foreach(s => s.file.delete()) + spills.clear() + } + + /** + * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, + * group together the pairs for each partition into a sub-iterator. + * + * @param data an iterator of elements, assumed to already be sorted by partition ID + */ + private def groupByPartition(data: Iterator[((Int, K), C)]) + : Iterator[(Int, Iterator[Product2[K, C]])] = + { + val buffered = data.buffered + (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered))) + } + + /** + * An iterator that reads only the elements for a given partition ID from an underlying buffered + * stream, assuming this partition is the next one to be read. Used to make it easier to return + * partitioned iterators from our in-memory collection. + */ + private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) + extends Iterator[Product2[K, C]] + { + override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val elem = data.next() + (elem._1._2, elem._2) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala new file mode 100644 index 0000000000000..d14d570f3d83f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala @@ -0,0 +1,703 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.annotations.VisibleForTesting +import com.google.common.io.ByteStreams + +import org.apache.spark._ +import org.apache.spark.serializer._ +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter} +import org.apache.spark.storage.{BlockId, BlockObjectWriter} + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied + * to its use in sort-based shuffle (for example, its block compression is controlled by + * `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other + * non-shuffle contexts where we might want to use different configuration settings. + * + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * Users interact with this class in the following way: + * + * 1. Instantiate an ExternalSorter. + * + * 2. Call insertAll() with a set of records. + * + * 3. Request an iterator() back to traverse sorted/aggregated records. + * - or - + * Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs + * that can be used in Spark's sort shuffle. + * + * At a high level, this class works internally as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a PartitionedAppendOnlyMap if + * we want to combine by key, or a PartitionedSerializedPairBuffer or PartitionedPairBuffer if we + * don't. Inside these buffers, we sort elements by partition ID and then possibly also by key. + * To avoid calling the partitioner multiple times with each key, we store the partition ID + * alongside each record. + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator or file output, the spilled files are merged, along with + * any remaining in-memory data, using the same sort order defined above (unless both sorting + * and aggregation are disabled). If we need to aggregate by key, we either use a total ordering + * from the ordering parameter, or read the keys with the same hash code and compare them with + * each other for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorterNoAgg[K, V, C]( + partitioner: Option[Partitioner] = None, + ordering: Option[Ordering[K]] = None, + serializer: Option[Serializer] = None) + extends Logging + with Spillable[WritablePartitionedPairCollection[K, C]] + with SortShuffleFileWriter[K, V] { + + private val conf = SparkEnv.get.conf + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions > 1 + private def getPartition(key: K): Int = { + if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Since SPARK-7855, bypassMergeSort optimization is no longer performed as part of this class. + // As a sanity check, make sure that we're not handling a shuffle which should use that path. + if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, None, ordering)) { + throw new IllegalArgumentException("ExternalSorter should not be used to handle " + + " a sort that the BypassMergeSortShuffleWriter should handle") + } + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) + + // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided + private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 + + // Size of object batches when reading/writing from serializers. + // + // Objects are written in batches, with each batch using its own serialization stream. This + // cuts down on the size of reference-tracking maps constructed when deserializing a stream. + // + // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers + // grow internal data structures by growing + copying every time the number of objects doubles. + private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) + + private val useSerializedPairBuffer = + ordering.isEmpty && + conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && + ser.supportsRelocationOfSerializedObjects + private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB + private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { + if (useSerializedPairBuffer) { + new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) + } else { + new PartitionedPairBuffer[K, C] + } + } + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + private var map = new PartitionedAppendOnlyMap[K, C] + private var buffer = newBuffer() + + // Total spilling statistics + private var _diskBytesSpilled = 0L + def diskBytesSpilled: Long = _diskBytesSpilled + + + // A comparator for keys K that orders them within a partition to allow aggregation or sorting. + // Can be a partial ordering by hash code if a total ordering is not provided through by the + // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some + // non-equal keys also have this, so we need to do a later pass to find truly equal keys). + // Note that we ignore this if no aggregator and no ordering are given. + private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { + override def compare(a: K, b: K): Int = { + val h1 = if (a == null) 0 else a.hashCode() + val h2 = if (b == null) 0 else b.hashCode() + if (h1 < h2) -1 else if (h1 == h2) 0 else 1 + } + }) + + private def comparator: Option[Comparator[K]] = { + if (ordering.isDefined) { + Some(keyComparator) + } else { + None + } + } + + // Information about a spilled file. Includes sizes in bytes of "batches" written by the + // serializer as we periodically reset its stream, as well as number of elements in each + // partition, used to efficiently keep track of partitions when merging. + private[this] case class SpilledFile( + file: File, + blockId: BlockId, + serializerBatchSizes: Array[Long], + elementsPerPartition: Array[Long]) + + private val spills = new ArrayBuffer[SpilledFile] + + override def insertAll(records: Iterator[Product2[K, V]]): Unit = { + // Stick values into our buffer + while (records.hasNext) { + addElementsRead() + val kv = records.next() + buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) + maybeSpillCollection(usingMap = false) + } + } + + /** + * Spill the current in-memory collection to disk if needed. + * + * @param usingMap whether we're using a map or buffer as our current in-memory collection + */ + private def maybeSpillCollection(usingMap: Boolean): Unit = { + if (!spillingEnabled) { + return + } + + if (usingMap) { + if (maybeSpill(map, map.estimateSize())) { + map = new PartitionedAppendOnlyMap[K, C] + } + } else { + if (maybeSpill(buffer, buffer.estimateSize())) { + buffer = newBuffer() + } + } + } + + /** + * Spill our in-memory collection to a sorted file that we can merge later. + * We add this file into `spilledFiles` to find it later. + * + * @param collection whichever collection we're using (map or buffer) + */ + override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() + + // These variables are reset after each flush + var objectsWritten: Long = 0 + var spillMetrics: ShuffleWriteMetrics = null + var writer: BlockObjectWriter = null + def openWriter(): Unit = { + assert (writer == null && spillMetrics == null) + spillMetrics = new ShuffleWriteMetrics + writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) + } + openWriter() + + // List of batch sizes (bytes) in the order they are written to disk + val batchSizes = new ArrayBuffer[Long] + + // How many elements we have in each partition + val elementsPerPartition = new Array[Long](numPartitions) + + // Flush the disk writer's contents to disk, and update relevant variables. + // The writer is closed at the end of this process, and cannot be reused. + def flush(): Unit = { + val w = writer + writer = null + w.commitAndClose() + _diskBytesSpilled += spillMetrics.shuffleBytesWritten + batchSizes.append(spillMetrics.shuffleBytesWritten) + spillMetrics = null + objectsWritten = 0 + } + + var success = false + try { + val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + while (it.hasNext) { + val partitionId = it.nextPartition() + it.writeNext(writer) + elementsPerPartition(partitionId) += 1 + objectsWritten += 1 + + if (objectsWritten == serializerBatchSize) { + flush() + openWriter() + } + } + if (objectsWritten > 0) { + flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() + } + success = true + } finally { + if (!success) { + // This code path only happens if an exception was thrown above before we set success; + // close our stuff and let the exception be thrown further + if (writer != null) { + writer.revertPartialWritesAndClose() + } + if (file.exists()) { + file.delete() + } + } + } + + spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) + } + + /** + * Merge a sequence of sorted files, giving an iterator over partitions and then over elements + * inside each partition. This can be used to either write out a new file or return data to + * the user. + * + * Returns an iterator over all the data written to this object, grouped by partition. For each + * partition we then have an iterator over its contents, and these are expected to be accessed + * in order (you can't "skip ahead" to one partition without reading the previous one). + * Guaranteed to return a key-value pair for each partition, in order of partition ID. + */ + private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) + : Iterator[(Int, Iterator[Product2[K, C]])] = { + val readers = spills.map(new SpillReader(_)) + val inMemBuffered = inMemory.buffered + (0 until numPartitions).iterator.map { p => + val inMemIterator = new IteratorForPartition(p, inMemBuffered) + val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) + if (ordering.isDefined) { + // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey); + // sort the elements without trying to merge them + (p, mergeSort(iterators, ordering.get)) + } else { + (p, iterators.iterator.flatten) + } + } + } + + /** + * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. + */ + private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) + : Iterator[Product2[K, C]] = + { + val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) + type Iter = BufferedIterator[Product2[K, C]] + val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { + // Use the reverse of comparator.compare because PriorityQueue dequeues the max + override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) + }) + heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true + new Iterator[Product2[K, C]] { + override def hasNext: Boolean = !heap.isEmpty + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val firstBuf = heap.dequeue() + val firstPair = firstBuf.next() + if (firstBuf.hasNext) { + heap.enqueue(firstBuf) + } + firstPair + } + } + } + + /** + * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each + * iterator is sorted by key with a given comparator. If the comparator is not a total ordering + * (e.g. when we sort objects by hash code and different keys may compare as equal although + * they're not), we still merge them by doing equality tests for all keys that compare as equal. + */ + private def mergeWithAggregation( + iterators: Seq[Iterator[Product2[K, C]]], + mergeCombiners: (C, C) => C, + comparator: Comparator[K], + totalOrder: Boolean) + : Iterator[Product2[K, C]] = + { + if (!totalOrder) { + // We only have a partial ordering, e.g. comparing the keys by hash code, which means that + // multiple distinct keys might be treated as equal by the ordering. To deal with this, we + // need to read all keys considered equal by the ordering at once and compare them. + new Iterator[Iterator[Product2[K, C]]] { + val sorted = mergeSort(iterators, comparator).buffered + + // Buffers reused across elements to decrease memory allocation + val keys = new ArrayBuffer[K] + val combiners = new ArrayBuffer[C] + + override def hasNext: Boolean = sorted.hasNext + + override def next(): Iterator[Product2[K, C]] = { + if (!hasNext) { + throw new NoSuchElementException + } + keys.clear() + combiners.clear() + val firstPair = sorted.next() + keys += firstPair._1 + combiners += firstPair._2 + val key = firstPair._1 + while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) { + val pair = sorted.next() + var i = 0 + var foundKey = false + while (i < keys.size && !foundKey) { + if (keys(i) == pair._1) { + combiners(i) = mergeCombiners(combiners(i), pair._2) + foundKey = true + } + i += 1 + } + if (!foundKey) { + keys += pair._1 + combiners += pair._2 + } + } + + // Note that we return an iterator of elements since we could've had many keys marked + // equal by the partial order; we flatten this below to get a flat iterator of (K, C). + keys.iterator.zip(combiners.iterator) + } + }.flatMap(i => i) + } else { + // We have a total ordering, so the objects with the same key are sequential. + new Iterator[Product2[K, C]] { + val sorted = mergeSort(iterators, comparator).buffered + + override def hasNext: Boolean = sorted.hasNext + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val elem = sorted.next() + val k = elem._1 + var c = elem._2 + while (sorted.hasNext && sorted.head._1 == k) { + val pair = sorted.next() + c = mergeCombiners(c, pair._2) + } + (k, c) + } + } + } + } + + /** + * An internal class for reading a spilled file partition by partition. Expects all the + * partitions to be requested in order. + */ + private[this] class SpillReader(spill: SpilledFile) { + // Serializer batch offsets; size will be batchSize.length + 1 + val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _) + + // Track which partition and which batch stream we're in. These will be the indices of + // the next element we will read. We'll also store the last partition read so that + // readNextPartition() can figure out what partition that was from. + var partitionId = 0 + var indexInPartition = 0L + var batchId = 0 + var indexInBatch = 0 + var lastPartitionId = 0 + + skipToNextPartition() + + // Intermediate file and deserializer streams that read from exactly one batch + // This guards against pre-fetching and other arbitrary behavior of higher level streams + var fileStream: FileInputStream = null + var deserializeStream = nextBatchStream() // Also sets fileStream + + var nextItem: (K, C) = null + var finished = false + + /** Construct a stream that only reads from the next batch */ + def nextBatchStream(): DeserializationStream = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchId < batchOffsets.length - 1) { + if (deserializeStream != null) { + deserializeStream.close() + fileStream.close() + deserializeStream = null + fileStream = null + } + + val start = batchOffsets(batchId) + fileStream = new FileInputStream(spill.file) + fileStream.getChannel.position(start) + batchId += 1 + + val end = batchOffsets(batchId) + + assert(end >= start, "start = " + start + ", end = " + end + + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) + + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream) + serInstance.deserializeStream(compressedStream) + } else { + // No more batches left + cleanup() + null + } + } + + /** + * Update partitionId if we have reached the end of our current partition, possibly skipping + * empty partitions on the way. + */ + private def skipToNextPartition() { + while (partitionId < numPartitions && + indexInPartition == spill.elementsPerPartition(partitionId)) { + partitionId += 1 + indexInPartition = 0L + } + } + + /** + * Return the next (K, C) pair from the deserialization stream and update partitionId, + * indexInPartition, indexInBatch and such to match its location. + * + * If the current batch is drained, construct a stream for the next batch and read from it. + * If no more pairs are left, return null. + */ + private def readNextItem(): (K, C) = { + if (finished || deserializeStream == null) { + return null + } + val k = deserializeStream.readKey().asInstanceOf[K] + val c = deserializeStream.readValue().asInstanceOf[C] + lastPartitionId = partitionId + // Start reading the next batch if we're done with this one + indexInBatch += 1 + if (indexInBatch == serializerBatchSize) { + indexInBatch = 0 + deserializeStream = nextBatchStream() + } + // Update the partition location of the element we're reading + indexInPartition += 1 + skipToNextPartition() + // If we've finished reading the last partition, remember that we're done + if (partitionId == numPartitions) { + finished = true + if (deserializeStream != null) { + deserializeStream.close() + } + } + (k, c) + } + + var nextPartitionToRead = 0 + + def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] { + val myPartition = nextPartitionToRead + nextPartitionToRead += 1 + + override def hasNext: Boolean = { + if (nextItem == null) { + nextItem = readNextItem() + if (nextItem == null) { + return false + } + } + assert(lastPartitionId >= myPartition) + // Check that we're still in the right partition; note that readNextItem will have returned + // null at EOF above so we would've returned false there + lastPartitionId == myPartition + } + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val item = nextItem + nextItem = null + item + } + } + + // Clean up our open streams and put us in a state where we can't read any more data + def cleanup() { + batchId = batchOffsets.length // Prevent reading any other batch + val ds = deserializeStream + deserializeStream = null + fileStream = null + ds.close() + // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop(). + // This should also be fixed in ExternalAppendOnlyMap. + } + } + + /** + * Return an iterator over all the data written to this object, grouped by partition and + * aggregated by the requested aggregator. For each partition we then have an iterator over its + * contents, and these are expected to be accessed in order (you can't "skip ahead" to one + * partition without reading the previous one). Guaranteed to return a key-value pair for each + * partition, in order of partition ID. + * + * For now, we just merge all the spilled files in once pass, but this can be modified to + * support hierarchical merging. + */ + @VisibleForTesting + def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { + val collection: WritablePartitionedPairCollection[K, C] = buffer + if (spills.isEmpty) { + // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps + // we don't even need to sort by anything other than partition ID + if (!ordering.isDefined) { + // The user hasn't requested sorted keys, so only sort by partition ID, not key + groupByPartition(collection.partitionedDestructiveSortedIterator(None)) + } else { + // We do need to sort by both partition ID and key + groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator))) + } + } else { + // Merge spilled and in-memory data + merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) + } + } + + /** + * Return an iterator over all the data written to this object, aggregated by our aggregator. + */ + def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) + + /** + * Write all the data added into this ExternalSorter into a file in the disk store. This is + * called by the SortShuffleWriter. + * + * @param blockId block ID to write to. The index file will be blockId.name + ".index". + * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. + * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) + */ + override def writePartitionedFile( + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] = { + + // Track location of each range in the output file + val lengths = new Array[Long](numPartitions) + + if (spills.isEmpty) { + // Case where we only have in-memory data + val collection = buffer + val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + while (it.hasNext) { + val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, + context.taskMetrics.shuffleWriteMetrics.get) + val partitionId = it.nextPartition() + while (it.hasNext && it.nextPartition() == partitionId) { + it.writeNext(writer) + } + writer.commitAndClose() + val segment = writer.fileSegment() + lengths(partitionId) = segment.length + } + } else { + // We must perform merge-sort; get an iterator by partition and write everything directly. + for ((id, elements) <- this.partitionedIterator) { + if (elements.hasNext) { + val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, + context.taskMetrics.shuffleWriteMetrics.get) + for (elem <- elements) { + writer.write(elem._1, elem._2) + } + writer.commitAndClose() + val segment = writer.fileSegment() + lengths(id) = segment.length + } + } + } + + context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) + + lengths + } + + def stop(): Unit = { + spills.foreach(s => s.file.delete()) + spills.clear() + } + + /** + * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, + * group together the pairs for each partition into a sub-iterator. + * + * @param data an iterator of elements, assumed to already be sorted by partition ID + */ + private def groupByPartition(data: Iterator[((Int, K), C)]) + : Iterator[(Int, Iterator[Product2[K, C]])] = + { + val buffered = data.buffered + (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered))) + } + + /** + * An iterator that reads only the elements for a given partition ID from an underlying buffered + * stream, assuming this partition is the next one to be read. Used to make it easier to return + * partitioned iterators from our in-memory collection. + */ + private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) + extends Iterator[Product2[K, C]] + { + override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId + + override def next(): Product2[K, C] = { + if (!hasNext) { + throw new NoSuchElementException + } + val elem = data.next() + (elem._1._2, elem._2) + } + } +} From 4a04b2a17cdc8683e9380c287d13f6af0e5ed6a2 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 12:18:15 -0400 Subject: [PATCH 02/14] Created a top level interface for ExternalSorter and created two subclasses that implement this interface to abstract away the aggregator usage. --- .../util/collection/ExternalSorter.scala | 184 +++--- .../util/collection/ExternalSorterAgg.scala | 547 +---------------- .../util/collection/ExternalSorterNoAgg.scala | 556 +----------------- 3 files changed, 89 insertions(+), 1198 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 757dec66c203b..31cced75611bd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -20,18 +20,17 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable - import com.google.common.annotations.VisibleForTesting import com.google.common.io.ByteStreams - import org.apache.spark._ -import org.apache.spark.serializer._ import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter} +import org.apache.spark.serializer._ +import org.apache.spark.shuffle.sort.SortShuffleFileWriter import org.apache.spark.storage.{BlockId, BlockObjectWriter} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then @@ -45,7 +44,6 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other * non-shuffle contexts where we might want to use different configuration settings. * - * @param aggregator optional Aggregator with combine functions to use for merging data * @param partitioner optional Partitioner; if given, sort by partition ID and then key * @param ordering optional Ordering to sort keys within each partition; should be a total ordering * @param serializer serializer to use when spilling to disk @@ -87,39 +85,31 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * * - Users are expected to call stop() at the end to delete all the intermediate files. */ -private[spark] class ExternalSorter[K, V, C]( - aggregator: Option[Aggregator[K, V, C]] = None, - partitioner: Option[Partitioner] = None, - ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) +private[spark] abstract class ExternalSorter[K, V, C]( + partitioner: Option[Partitioner] = None, + ordering: Option[Ordering[K]] = None, + serializer: Option[Serializer] = None) extends Logging with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { - private val conf = SparkEnv.get.conf + protected val conf = SparkEnv.get.conf - private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) - private val shouldPartition = numPartitions > 1 - private def getPartition(key: K): Int = { + protected val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + protected val shouldPartition = numPartitions > 1 + protected def getPartition(key: K): Int = { if (shouldPartition) partitioner.get.getPartition(key) else 0 } - // Since SPARK-7855, bypassMergeSort optimization is no longer performed as part of this class. - // As a sanity check, make sure that we're not handling a shuffle which should use that path. - if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, aggregator, ordering)) { - throw new IllegalArgumentException("ExternalSorter should not be used to handle " - + " a sort that the BypassMergeSortShuffleWriter should handle") - } - - private val blockManager = SparkEnv.get.blockManager - private val diskBlockManager = blockManager.diskBlockManager - private val ser = Serializer.getSerializer(serializer) - private val serInstance = ser.newInstance() + protected val blockManager = SparkEnv.get.blockManager + protected val diskBlockManager = blockManager.diskBlockManager + protected val ser = Serializer.getSerializer(serializer) + protected val serInstance = ser.newInstance() - private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) + protected val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 + protected val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 // Size of object batches when reading/writing from serializers. // @@ -128,14 +118,14 @@ private[spark] class ExternalSorter[K, V, C]( // // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers // grow internal data structures by growing + copying every time the number of objects doubles. - private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) + protected val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) - private val useSerializedPairBuffer = + protected val useSerializedPairBuffer = ordering.isEmpty && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && ser.supportsRelocationOfSerializedObjects - private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB - private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { + protected val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB + protected def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { if (useSerializedPairBuffer) { new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) } else { @@ -145,20 +135,19 @@ private[spark] class ExternalSorter[K, V, C]( // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. - private var map = new PartitionedAppendOnlyMap[K, C] - private var buffer = newBuffer() + protected var map = new PartitionedAppendOnlyMap[K, C] + protected var buffer = newBuffer() // Total spilling statistics - private var _diskBytesSpilled = 0L + protected var _diskBytesSpilled = 0L def diskBytesSpilled: Long = _diskBytesSpilled - // A comparator for keys K that orders them within a partition to allow aggregation or sorting. // Can be a partial ordering by hash code if a total ordering is not provided through by the // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some // non-equal keys also have this, so we need to do a later pass to find truly equal keys). // Note that we ignore this if no aggregator and no ordering are given. - private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { + protected val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { override def compare(a: K, b: K): Int = { val h1 = if (a == null) 0 else a.hashCode() val h2 = if (b == null) 0 else b.hashCode() @@ -166,60 +155,29 @@ private[spark] class ExternalSorter[K, V, C]( } }) - private def comparator: Option[Comparator[K]] = { - if (ordering.isDefined || aggregator.isDefined) { - Some(keyComparator) - } else { - None - } - } + // Interface for comparator object to abstract away aggregator dependence + protected def comparator: Option[Comparator[K]] // Information about a spilled file. Includes sizes in bytes of "batches" written by the // serializer as we periodically reset its stream, as well as number of elements in each // partition, used to efficiently keep track of partitions when merging. - private[this] case class SpilledFile( + protected[this] case class SpilledFile( file: File, blockId: BlockId, serializerBatchSizes: Array[Long], elementsPerPartition: Array[Long]) - private val spills = new ArrayBuffer[SpilledFile] + protected val spills = new ArrayBuffer[SpilledFile] - override def insertAll(records: Iterator[Product2[K, V]]): Unit = { - // TODO: stop combining if we find that the reduction factor isn't high - val shouldCombine = aggregator.isDefined - - if (shouldCombine) { - // Combine values in-memory first using our AppendOnlyMap - val mergeValue = aggregator.get.mergeValue - val createCombiner = aggregator.get.createCombiner - var kv: Product2[K, V] = null - val update = (hadValue: Boolean, oldValue: C) => { - if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) - } - while (records.hasNext) { - addElementsRead() - kv = records.next() - map.changeValue((getPartition(kv._1), kv._1), update) - maybeSpillCollection(usingMap = true) - } - } else { - // Stick values into our buffer - while (records.hasNext) { - addElementsRead() - val kv = records.next() - buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) - maybeSpillCollection(usingMap = false) - } - } - } + // Interface for insertAll to abstract away aggregator dependence + def insertAll(records: Iterator[Product2[K, V]]): Unit /** * Spill the current in-memory collection to disk if needed. * * @param usingMap whether we're using a map or buffer as our current in-memory collection */ - private def maybeSpillCollection(usingMap: Boolean): Unit = { + protected def maybeSpillCollection(usingMap: Boolean): Unit = { if (!spillingEnabled) { return } @@ -323,33 +281,17 @@ private[spark] class ExternalSorter[K, V, C]( * partition we then have an iterator over its contents, and these are expected to be accessed * in order (you can't "skip ahead" to one partition without reading the previous one). * Guaranteed to return a key-value pair for each partition, in order of partition ID. + * + * This interface abstracts away aggregator dependence */ - private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = { - val readers = spills.map(new SpillReader(_)) - val inMemBuffered = inMemory.buffered - (0 until numPartitions).iterator.map { p => - val inMemIterator = new IteratorForPartition(p, inMemBuffered) - val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) - if (aggregator.isDefined) { - // Perform partial aggregation across partitions - (p, mergeWithAggregation( - iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) - } else if (ordering.isDefined) { - // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey); - // sort the elements without trying to merge them - (p, mergeSort(iterators, ordering.get)) - } else { - (p, iterators.iterator.flatten) - } - } - } + protected def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) + : Iterator[(Int, Iterator[Product2[K, C]])] /** * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. */ - private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) - : Iterator[Product2[K, C]] = + protected def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) + : Iterator[Product2[K, C]] = { val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) type Iter = BufferedIterator[Product2[K, C]] @@ -381,12 +323,12 @@ private[spark] class ExternalSorter[K, V, C]( * (e.g. when we sort objects by hash code and different keys may compare as equal although * they're not), we still merge them by doing equality tests for all keys that compare as equal. */ - private def mergeWithAggregation( - iterators: Seq[Iterator[Product2[K, C]]], - mergeCombiners: (C, C) => C, - comparator: Comparator[K], - totalOrder: Boolean) - : Iterator[Product2[K, C]] = + protected def mergeWithAggregation( + iterators: Seq[Iterator[Product2[K, C]]], + mergeCombiners: (C, C) => C, + comparator: Comparator[K], + totalOrder: Boolean) + : Iterator[Product2[K, C]] = { if (!totalOrder) { // We only have a partial ordering, e.g. comparing the keys by hash code, which means that @@ -461,7 +403,7 @@ private[spark] class ExternalSorter[K, V, C]( * An internal class for reading a spilled file partition by partition. Expects all the * partitions to be requested in order. */ - private[this] class SpillReader(spill: SpilledFile) { + protected[this] class SpillReader(spill: SpilledFile) { // Serializer batch offsets; size will be batchSize.length + 1 val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _) @@ -522,7 +464,7 @@ private[spark] class ExternalSorter[K, V, C]( */ private def skipToNextPartition() { while (partitionId < numPartitions && - indexInPartition == spill.elementsPerPartition(partitionId)) { + indexInPartition == spill.elementsPerPartition(partitionId)) { partitionId += 1 indexInPartition = 0L } @@ -611,11 +553,14 @@ private[spark] class ExternalSorter[K, V, C]( * * For now, we just merge all the spilled files in once pass, but this can be modified to * support hierarchical merging. + * + * This interface abstracts away aggregator dependence. */ @VisibleForTesting - def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { - val usingMap = aggregator.isDefined - val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer + def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] + + protected def partitionedIterator(collection: WritablePartitionedPairCollection[K, C]): + Iterator[(Int, Iterator[Product2[K, C]])] = { if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID @@ -644,18 +589,25 @@ private[spark] class ExternalSorter[K, V, C]( * @param blockId block ID to write to. The index file will be blockId.name + ".index". * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) + * + * This interface abstracts away aggregator dependence. */ override def writePartitionedFile( - blockId: BlockId, - context: TaskContext, - outputFile: File): Array[Long] = { + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] + + protected def writePartitionedFile( + blockId: BlockId, + context: TaskContext, + outputFile: File, + collection: WritablePartitionedPairCollection[K, C]): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) if (spills.isEmpty) { // Case where we only have in-memory data - val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, @@ -701,8 +653,8 @@ private[spark] class ExternalSorter[K, V, C]( * * @param data an iterator of elements, assumed to already be sorted by partition ID */ - private def groupByPartition(data: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = + protected def groupByPartition(data: Iterator[((Int, K), C)]) + : Iterator[(Int, Iterator[Product2[K, C]])] = { val buffered = data.buffered (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered))) @@ -713,7 +665,7 @@ private[spark] class ExternalSorter[K, V, C]( * stream, assuming this partition is the next one to be read. Used to make it easier to return * partitioned iterators from our in-memory collection. */ - private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) + protected[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) extends Iterator[Product2[K, C]] { override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala index 6948f58fe7ab5..b0ef94ee15626 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala @@ -20,17 +20,11 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable - import com.google.common.annotations.VisibleForTesting -import com.google.common.io.ByteStreams - import org.apache.spark._ import org.apache.spark.serializer._ -import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter} -import org.apache.spark.storage.{BlockId, BlockObjectWriter} +import org.apache.spark.storage.BlockId /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner @@ -92,18 +86,10 @@ private[spark] class ExternalSorterAgg[K, V, C]( partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) - extends Logging + extends ExternalSorterParent[K,V,C] with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { - private val conf = SparkEnv.get.conf - - private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) - private val shouldPartition = numPartitions > 1 - private def getPartition(key: K): Int = { - if (shouldPartition) partitioner.get.getPartition(key) else 0 - } - // Since SPARK-7855, bypassMergeSort optimization is no longer performed as part of this class. // As a sanity check, make sure that we're not handling a shuffle which should use that path. if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, Some(aggregator), ordering)) { @@ -111,76 +97,10 @@ private[spark] class ExternalSorterAgg[K, V, C]( + " a sort that the BypassMergeSortShuffleWriter should handle") } - private val blockManager = SparkEnv.get.blockManager - private val diskBlockManager = blockManager.diskBlockManager - private val ser = Serializer.getSerializer(serializer) - private val serInstance = ser.newInstance() - - private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) - - // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 - - // Size of object batches when reading/writing from serializers. - // - // Objects are written in batches, with each batch using its own serialization stream. This - // cuts down on the size of reference-tracking maps constructed when deserializing a stream. - // - // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers - // grow internal data structures by growing + copying every time the number of objects doubles. - private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) - - private val useSerializedPairBuffer = - ordering.isEmpty && - conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.supportsRelocationOfSerializedObjects - private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB - private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { - if (useSerializedPairBuffer) { - new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) - } else { - new PartitionedPairBuffer[K, C] - } - } - // Data structures to store in-memory objects before we spill. Depending on whether we have an - // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we - // store them in an array buffer. - private var map = new PartitionedAppendOnlyMap[K, C] - private var buffer = newBuffer() - - // Total spilling statistics - private var _diskBytesSpilled = 0L - def diskBytesSpilled: Long = _diskBytesSpilled - - - // A comparator for keys K that orders them within a partition to allow aggregation or sorting. - // Can be a partial ordering by hash code if a total ordering is not provided through by the - // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some - // non-equal keys also have this, so we need to do a later pass to find truly equal keys). - // Note that we ignore this if no aggregator and no ordering are given. - private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { - override def compare(a: K, b: K): Int = { - val h1 = if (a == null) 0 else a.hashCode() - val h2 = if (b == null) 0 else b.hashCode() - if (h1 < h2) -1 else if (h1 == h2) 0 else 1 - } - }) - - private def comparator: Option[Comparator[K]] = { + override def comparator: Option[Comparator[K]] = { Some(keyComparator) } - // Information about a spilled file. Includes sizes in bytes of "batches" written by the - // serializer as we periodically reset its stream, as well as number of elements in each - // partition, used to efficiently keep track of partitions when merging. - private[this] case class SpilledFile( - file: File, - blockId: BlockId, - serializerBatchSizes: Array[Long], - elementsPerPartition: Array[Long]) - - private val spills = new ArrayBuffer[SpilledFile] - override def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high // Combine values in-memory first using our AppendOnlyMap @@ -198,106 +118,6 @@ private[spark] class ExternalSorterAgg[K, V, C]( } } - /** - * Spill the current in-memory collection to disk if needed. - * - * @param usingMap whether we're using a map or buffer as our current in-memory collection - */ - private def maybeSpillCollection(usingMap: Boolean): Unit = { - if (!spillingEnabled) { - return - } - - if (usingMap) { - if (maybeSpill(map, map.estimateSize())) { - map = new PartitionedAppendOnlyMap[K, C] - } - } else { - if (maybeSpill(buffer, buffer.estimateSize())) { - buffer = newBuffer() - } - } - } - - /** - * Spill our in-memory collection to a sorted file that we can merge later. - * We add this file into `spilledFiles` to find it later. - * - * @param collection whichever collection we're using (map or buffer) - */ - override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { - // Because these files may be read during shuffle, their compression must be controlled by - // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use - // createTempShuffleBlock here; see SPARK-3426 for more context. - val (blockId, file) = diskBlockManager.createTempShuffleBlock() - - // These variables are reset after each flush - var objectsWritten: Long = 0 - var spillMetrics: ShuffleWriteMetrics = null - var writer: BlockObjectWriter = null - def openWriter(): Unit = { - assert (writer == null && spillMetrics == null) - spillMetrics = new ShuffleWriteMetrics - writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) - } - openWriter() - - // List of batch sizes (bytes) in the order they are written to disk - val batchSizes = new ArrayBuffer[Long] - - // How many elements we have in each partition - val elementsPerPartition = new Array[Long](numPartitions) - - // Flush the disk writer's contents to disk, and update relevant variables. - // The writer is closed at the end of this process, and cannot be reused. - def flush(): Unit = { - val w = writer - writer = null - w.commitAndClose() - _diskBytesSpilled += spillMetrics.shuffleBytesWritten - batchSizes.append(spillMetrics.shuffleBytesWritten) - spillMetrics = null - objectsWritten = 0 - } - - var success = false - try { - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) - while (it.hasNext) { - val partitionId = it.nextPartition() - it.writeNext(writer) - elementsPerPartition(partitionId) += 1 - objectsWritten += 1 - - if (objectsWritten == serializerBatchSize) { - flush() - openWriter() - } - } - if (objectsWritten > 0) { - flush() - } else if (writer != null) { - val w = writer - writer = null - w.revertPartialWritesAndClose() - } - success = true - } finally { - if (!success) { - // This code path only happens if an exception was thrown above before we set success; - // close our stuff and let the exception be thrown further - if (writer != null) { - writer.revertPartialWritesAndClose() - } - if (file.exists()) { - file.delete() - } - } - } - - spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) - } - /** * Merge a sequence of sorted files, giving an iterator over partitions and then over elements * inside each partition. This can be used to either write out a new file or return data to @@ -308,8 +128,8 @@ private[spark] class ExternalSorterAgg[K, V, C]( * in order (you can't "skip ahead" to one partition without reading the previous one). * Guaranteed to return a key-value pair for each partition, in order of partition ID. */ - private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = { + override def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]): + Iterator[(Int, Iterator[Product2[K, C]])] = { val readers = spills.map(new SpillReader(_)) val inMemBuffered = inMemory.buffered (0 until numPartitions).iterator.map { p => @@ -321,263 +141,6 @@ private[spark] class ExternalSorterAgg[K, V, C]( } } - /** - * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. - */ - private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) - : Iterator[Product2[K, C]] = - { - val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) - type Iter = BufferedIterator[Product2[K, C]] - val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { - // Use the reverse of comparator.compare because PriorityQueue dequeues the max - override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) - }) - heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true - new Iterator[Product2[K, C]] { - override def hasNext: Boolean = !heap.isEmpty - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val firstBuf = heap.dequeue() - val firstPair = firstBuf.next() - if (firstBuf.hasNext) { - heap.enqueue(firstBuf) - } - firstPair - } - } - } - - /** - * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each - * iterator is sorted by key with a given comparator. If the comparator is not a total ordering - * (e.g. when we sort objects by hash code and different keys may compare as equal although - * they're not), we still merge them by doing equality tests for all keys that compare as equal. - */ - private def mergeWithAggregation( - iterators: Seq[Iterator[Product2[K, C]]], - mergeCombiners: (C, C) => C, - comparator: Comparator[K], - totalOrder: Boolean) - : Iterator[Product2[K, C]] = - { - if (!totalOrder) { - // We only have a partial ordering, e.g. comparing the keys by hash code, which means that - // multiple distinct keys might be treated as equal by the ordering. To deal with this, we - // need to read all keys considered equal by the ordering at once and compare them. - new Iterator[Iterator[Product2[K, C]]] { - val sorted = mergeSort(iterators, comparator).buffered - - // Buffers reused across elements to decrease memory allocation - val keys = new ArrayBuffer[K] - val combiners = new ArrayBuffer[C] - - override def hasNext: Boolean = sorted.hasNext - - override def next(): Iterator[Product2[K, C]] = { - if (!hasNext) { - throw new NoSuchElementException - } - keys.clear() - combiners.clear() - val firstPair = sorted.next() - keys += firstPair._1 - combiners += firstPair._2 - val key = firstPair._1 - while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) { - val pair = sorted.next() - var i = 0 - var foundKey = false - while (i < keys.size && !foundKey) { - if (keys(i) == pair._1) { - combiners(i) = mergeCombiners(combiners(i), pair._2) - foundKey = true - } - i += 1 - } - if (!foundKey) { - keys += pair._1 - combiners += pair._2 - } - } - - // Note that we return an iterator of elements since we could've had many keys marked - // equal by the partial order; we flatten this below to get a flat iterator of (K, C). - keys.iterator.zip(combiners.iterator) - } - }.flatMap(i => i) - } else { - // We have a total ordering, so the objects with the same key are sequential. - new Iterator[Product2[K, C]] { - val sorted = mergeSort(iterators, comparator).buffered - - override def hasNext: Boolean = sorted.hasNext - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val elem = sorted.next() - val k = elem._1 - var c = elem._2 - while (sorted.hasNext && sorted.head._1 == k) { - val pair = sorted.next() - c = mergeCombiners(c, pair._2) - } - (k, c) - } - } - } - } - - /** - * An internal class for reading a spilled file partition by partition. Expects all the - * partitions to be requested in order. - */ - private[this] class SpillReader(spill: SpilledFile) { - // Serializer batch offsets; size will be batchSize.length + 1 - val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _) - - // Track which partition and which batch stream we're in. These will be the indices of - // the next element we will read. We'll also store the last partition read so that - // readNextPartition() can figure out what partition that was from. - var partitionId = 0 - var indexInPartition = 0L - var batchId = 0 - var indexInBatch = 0 - var lastPartitionId = 0 - - skipToNextPartition() - - // Intermediate file and deserializer streams that read from exactly one batch - // This guards against pre-fetching and other arbitrary behavior of higher level streams - var fileStream: FileInputStream = null - var deserializeStream = nextBatchStream() // Also sets fileStream - - var nextItem: (K, C) = null - var finished = false - - /** Construct a stream that only reads from the next batch */ - def nextBatchStream(): DeserializationStream = { - // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether - // we're still in a valid batch. - if (batchId < batchOffsets.length - 1) { - if (deserializeStream != null) { - deserializeStream.close() - fileStream.close() - deserializeStream = null - fileStream = null - } - - val start = batchOffsets(batchId) - fileStream = new FileInputStream(spill.file) - fileStream.getChannel.position(start) - batchId += 1 - - val end = batchOffsets(batchId) - - assert(end >= start, "start = " + start + ", end = " + end + - ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - - val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) - val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream) - serInstance.deserializeStream(compressedStream) - } else { - // No more batches left - cleanup() - null - } - } - - /** - * Update partitionId if we have reached the end of our current partition, possibly skipping - * empty partitions on the way. - */ - private def skipToNextPartition() { - while (partitionId < numPartitions && - indexInPartition == spill.elementsPerPartition(partitionId)) { - partitionId += 1 - indexInPartition = 0L - } - } - - /** - * Return the next (K, C) pair from the deserialization stream and update partitionId, - * indexInPartition, indexInBatch and such to match its location. - * - * If the current batch is drained, construct a stream for the next batch and read from it. - * If no more pairs are left, return null. - */ - private def readNextItem(): (K, C) = { - if (finished || deserializeStream == null) { - return null - } - val k = deserializeStream.readKey().asInstanceOf[K] - val c = deserializeStream.readValue().asInstanceOf[C] - lastPartitionId = partitionId - // Start reading the next batch if we're done with this one - indexInBatch += 1 - if (indexInBatch == serializerBatchSize) { - indexInBatch = 0 - deserializeStream = nextBatchStream() - } - // Update the partition location of the element we're reading - indexInPartition += 1 - skipToNextPartition() - // If we've finished reading the last partition, remember that we're done - if (partitionId == numPartitions) { - finished = true - if (deserializeStream != null) { - deserializeStream.close() - } - } - (k, c) - } - - var nextPartitionToRead = 0 - - def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] { - val myPartition = nextPartitionToRead - nextPartitionToRead += 1 - - override def hasNext: Boolean = { - if (nextItem == null) { - nextItem = readNextItem() - if (nextItem == null) { - return false - } - } - assert(lastPartitionId >= myPartition) - // Check that we're still in the right partition; note that readNextItem will have returned - // null at EOF above so we would've returned false there - lastPartitionId == myPartition - } - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val item = nextItem - nextItem = null - item - } - } - - // Clean up our open streams and put us in a state where we can't read any more data - def cleanup() { - batchId = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop(). - // This should also be fixed in ExternalAppendOnlyMap. - } - } - /** * Return an iterator over all the data written to this object, grouped by partition and * aggregated by the requested aggregator. For each partition we then have an iterator over its @@ -589,29 +152,10 @@ private[spark] class ExternalSorterAgg[K, V, C]( * support hierarchical merging. */ @VisibleForTesting - def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { - val collection: WritablePartitionedPairCollection[K, C] = map - if (spills.isEmpty) { - // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps - // we don't even need to sort by anything other than partition ID - if (!ordering.isDefined) { - // The user hasn't requested sorted keys, so only sort by partition ID, not key - groupByPartition(collection.partitionedDestructiveSortedIterator(None)) - } else { - // We do need to sort by both partition ID and key - groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator))) - } - } else { - // Merge spilled and in-memory data - merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) - } + override def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { + partitionedIterator(map) } - /** - * Return an iterator over all the data written to this object, aggregated by our aggregator. - */ - def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) - /** * Write all the data added into this ExternalSorter into a file in the disk store. This is * called by the SortShuffleWriter. @@ -624,81 +168,6 @@ private[spark] class ExternalSorterAgg[K, V, C]( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { - - // Track location of each range in the output file - val lengths = new Array[Long](numPartitions) - - if (spills.isEmpty) { - // Case where we only have in-memory data - val collection = map - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) - while (it.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) - val partitionId = it.nextPartition() - while (it.hasNext && it.nextPartition() == partitionId) { - it.writeNext(writer) - } - writer.commitAndClose() - val segment = writer.fileSegment() - lengths(partitionId) = segment.length - } - } else { - // We must perform merge-sort; get an iterator by partition and write everything directly. - for ((id, elements) <- this.partitionedIterator) { - if (elements.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) - for (elem <- elements) { - writer.write(elem._1, elem._2) - } - writer.commitAndClose() - val segment = writer.fileSegment() - lengths(id) = segment.length - } - } - } - - context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) - - lengths - } - - def stop(): Unit = { - spills.foreach(s => s.file.delete()) - spills.clear() - } - - /** - * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, - * group together the pairs for each partition into a sub-iterator. - * - * @param data an iterator of elements, assumed to already be sorted by partition ID - */ - private def groupByPartition(data: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = - { - val buffered = data.buffered - (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered))) - } - - /** - * An iterator that reads only the elements for a given partition ID from an underlying buffered - * stream, assuming this partition is the next one to be read. Used to make it easier to return - * partitioned iterators from our in-memory collection. - */ - private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) - extends Iterator[Product2[K, C]] - { - override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val elem = data.next() - (elem._1._2, elem._2) - } + writePartitionedFile(blockId, context, outputFile, map) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala index d14d570f3d83f..fa4b178aea661 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala @@ -20,17 +20,11 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable - import com.google.common.annotations.VisibleForTesting -import com.google.common.io.ByteStreams - import org.apache.spark._ import org.apache.spark.serializer._ -import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter} -import org.apache.spark.storage.{BlockId, BlockObjectWriter} +import org.apache.spark.shuffle.sort.{SortShuffleWriter, SortShuffleFileWriter} +import org.apache.spark.storage.BlockId /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner @@ -90,18 +84,10 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) - extends Logging + extends ExternalSorterParent[K, V, C](partitioner, ordering, serializer) with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { - private val conf = SparkEnv.get.conf - - private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) - private val shouldPartition = numPartitions > 1 - private def getPartition(key: K): Int = { - if (shouldPartition) partitioner.get.getPartition(key) else 0 - } - // Since SPARK-7855, bypassMergeSort optimization is no longer performed as part of this class. // As a sanity check, make sure that we're not handling a shuffle which should use that path. if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, None, ordering)) { @@ -109,62 +95,7 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( + " a sort that the BypassMergeSortShuffleWriter should handle") } - private val blockManager = SparkEnv.get.blockManager - private val diskBlockManager = blockManager.diskBlockManager - private val ser = Serializer.getSerializer(serializer) - private val serInstance = ser.newInstance() - - private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) - - // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 - - // Size of object batches when reading/writing from serializers. - // - // Objects are written in batches, with each batch using its own serialization stream. This - // cuts down on the size of reference-tracking maps constructed when deserializing a stream. - // - // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers - // grow internal data structures by growing + copying every time the number of objects doubles. - private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) - - private val useSerializedPairBuffer = - ordering.isEmpty && - conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.supportsRelocationOfSerializedObjects - private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB - private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { - if (useSerializedPairBuffer) { - new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) - } else { - new PartitionedPairBuffer[K, C] - } - } - // Data structures to store in-memory objects before we spill. Depending on whether we have an - // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we - // store them in an array buffer. - private var map = new PartitionedAppendOnlyMap[K, C] - private var buffer = newBuffer() - - // Total spilling statistics - private var _diskBytesSpilled = 0L - def diskBytesSpilled: Long = _diskBytesSpilled - - - // A comparator for keys K that orders them within a partition to allow aggregation or sorting. - // Can be a partial ordering by hash code if a total ordering is not provided through by the - // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some - // non-equal keys also have this, so we need to do a later pass to find truly equal keys). - // Note that we ignore this if no aggregator and no ordering are given. - private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { - override def compare(a: K, b: K): Int = { - val h1 = if (a == null) 0 else a.hashCode() - val h2 = if (b == null) 0 else b.hashCode() - if (h1 < h2) -1 else if (h1 == h2) 0 else 1 - } - }) - - private def comparator: Option[Comparator[K]] = { + override def comparator: Option[Comparator[K]] = { if (ordering.isDefined) { Some(keyComparator) } else { @@ -172,17 +103,6 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( } } - // Information about a spilled file. Includes sizes in bytes of "batches" written by the - // serializer as we periodically reset its stream, as well as number of elements in each - // partition, used to efficiently keep track of partitions when merging. - private[this] case class SpilledFile( - file: File, - blockId: BlockId, - serializerBatchSizes: Array[Long], - elementsPerPartition: Array[Long]) - - private val spills = new ArrayBuffer[SpilledFile] - override def insertAll(records: Iterator[Product2[K, V]]): Unit = { // Stick values into our buffer while (records.hasNext) { @@ -193,106 +113,6 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( } } - /** - * Spill the current in-memory collection to disk if needed. - * - * @param usingMap whether we're using a map or buffer as our current in-memory collection - */ - private def maybeSpillCollection(usingMap: Boolean): Unit = { - if (!spillingEnabled) { - return - } - - if (usingMap) { - if (maybeSpill(map, map.estimateSize())) { - map = new PartitionedAppendOnlyMap[K, C] - } - } else { - if (maybeSpill(buffer, buffer.estimateSize())) { - buffer = newBuffer() - } - } - } - - /** - * Spill our in-memory collection to a sorted file that we can merge later. - * We add this file into `spilledFiles` to find it later. - * - * @param collection whichever collection we're using (map or buffer) - */ - override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { - // Because these files may be read during shuffle, their compression must be controlled by - // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use - // createTempShuffleBlock here; see SPARK-3426 for more context. - val (blockId, file) = diskBlockManager.createTempShuffleBlock() - - // These variables are reset after each flush - var objectsWritten: Long = 0 - var spillMetrics: ShuffleWriteMetrics = null - var writer: BlockObjectWriter = null - def openWriter(): Unit = { - assert (writer == null && spillMetrics == null) - spillMetrics = new ShuffleWriteMetrics - writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) - } - openWriter() - - // List of batch sizes (bytes) in the order they are written to disk - val batchSizes = new ArrayBuffer[Long] - - // How many elements we have in each partition - val elementsPerPartition = new Array[Long](numPartitions) - - // Flush the disk writer's contents to disk, and update relevant variables. - // The writer is closed at the end of this process, and cannot be reused. - def flush(): Unit = { - val w = writer - writer = null - w.commitAndClose() - _diskBytesSpilled += spillMetrics.shuffleBytesWritten - batchSizes.append(spillMetrics.shuffleBytesWritten) - spillMetrics = null - objectsWritten = 0 - } - - var success = false - try { - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) - while (it.hasNext) { - val partitionId = it.nextPartition() - it.writeNext(writer) - elementsPerPartition(partitionId) += 1 - objectsWritten += 1 - - if (objectsWritten == serializerBatchSize) { - flush() - openWriter() - } - } - if (objectsWritten > 0) { - flush() - } else if (writer != null) { - val w = writer - writer = null - w.revertPartialWritesAndClose() - } - success = true - } finally { - if (!success) { - // This code path only happens if an exception was thrown above before we set success; - // close our stuff and let the exception be thrown further - if (writer != null) { - writer.revertPartialWritesAndClose() - } - if (file.exists()) { - file.delete() - } - } - } - - spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) - } - /** * Merge a sequence of sorted files, giving an iterator over partitions and then over elements * inside each partition. This can be used to either write out a new file or return data to @@ -303,8 +123,8 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( * in order (you can't "skip ahead" to one partition without reading the previous one). * Guaranteed to return a key-value pair for each partition, in order of partition ID. */ - private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = { + override def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]): + Iterator[(Int, Iterator[Product2[K, C]])] = { val readers = spills.map(new SpillReader(_)) val inMemBuffered = inMemory.buffered (0 until numPartitions).iterator.map { p => @@ -321,296 +141,20 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( } /** - * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. - */ - private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) - : Iterator[Product2[K, C]] = - { - val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) - type Iter = BufferedIterator[Product2[K, C]] - val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { - // Use the reverse of comparator.compare because PriorityQueue dequeues the max - override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) - }) - heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true - new Iterator[Product2[K, C]] { - override def hasNext: Boolean = !heap.isEmpty - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val firstBuf = heap.dequeue() - val firstPair = firstBuf.next() - if (firstBuf.hasNext) { - heap.enqueue(firstBuf) - } - firstPair - } - } - } - - /** - * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each - * iterator is sorted by key with a given comparator. If the comparator is not a total ordering - * (e.g. when we sort objects by hash code and different keys may compare as equal although - * they're not), we still merge them by doing equality tests for all keys that compare as equal. - */ - private def mergeWithAggregation( - iterators: Seq[Iterator[Product2[K, C]]], - mergeCombiners: (C, C) => C, - comparator: Comparator[K], - totalOrder: Boolean) - : Iterator[Product2[K, C]] = - { - if (!totalOrder) { - // We only have a partial ordering, e.g. comparing the keys by hash code, which means that - // multiple distinct keys might be treated as equal by the ordering. To deal with this, we - // need to read all keys considered equal by the ordering at once and compare them. - new Iterator[Iterator[Product2[K, C]]] { - val sorted = mergeSort(iterators, comparator).buffered - - // Buffers reused across elements to decrease memory allocation - val keys = new ArrayBuffer[K] - val combiners = new ArrayBuffer[C] - - override def hasNext: Boolean = sorted.hasNext - - override def next(): Iterator[Product2[K, C]] = { - if (!hasNext) { - throw new NoSuchElementException - } - keys.clear() - combiners.clear() - val firstPair = sorted.next() - keys += firstPair._1 - combiners += firstPair._2 - val key = firstPair._1 - while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) { - val pair = sorted.next() - var i = 0 - var foundKey = false - while (i < keys.size && !foundKey) { - if (keys(i) == pair._1) { - combiners(i) = mergeCombiners(combiners(i), pair._2) - foundKey = true - } - i += 1 - } - if (!foundKey) { - keys += pair._1 - combiners += pair._2 - } - } - - // Note that we return an iterator of elements since we could've had many keys marked - // equal by the partial order; we flatten this below to get a flat iterator of (K, C). - keys.iterator.zip(combiners.iterator) - } - }.flatMap(i => i) - } else { - // We have a total ordering, so the objects with the same key are sequential. - new Iterator[Product2[K, C]] { - val sorted = mergeSort(iterators, comparator).buffered - - override def hasNext: Boolean = sorted.hasNext - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val elem = sorted.next() - val k = elem._1 - var c = elem._2 - while (sorted.hasNext && sorted.head._1 == k) { - val pair = sorted.next() - c = mergeCombiners(c, pair._2) - } - (k, c) - } - } - } - } - - /** - * An internal class for reading a spilled file partition by partition. Expects all the - * partitions to be requested in order. - */ - private[this] class SpillReader(spill: SpilledFile) { - // Serializer batch offsets; size will be batchSize.length + 1 - val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _) - - // Track which partition and which batch stream we're in. These will be the indices of - // the next element we will read. We'll also store the last partition read so that - // readNextPartition() can figure out what partition that was from. - var partitionId = 0 - var indexInPartition = 0L - var batchId = 0 - var indexInBatch = 0 - var lastPartitionId = 0 - - skipToNextPartition() - - // Intermediate file and deserializer streams that read from exactly one batch - // This guards against pre-fetching and other arbitrary behavior of higher level streams - var fileStream: FileInputStream = null - var deserializeStream = nextBatchStream() // Also sets fileStream - - var nextItem: (K, C) = null - var finished = false - - /** Construct a stream that only reads from the next batch */ - def nextBatchStream(): DeserializationStream = { - // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether - // we're still in a valid batch. - if (batchId < batchOffsets.length - 1) { - if (deserializeStream != null) { - deserializeStream.close() - fileStream.close() - deserializeStream = null - fileStream = null - } - - val start = batchOffsets(batchId) - fileStream = new FileInputStream(spill.file) - fileStream.getChannel.position(start) - batchId += 1 - - val end = batchOffsets(batchId) - - assert(end >= start, "start = " + start + ", end = " + end + - ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) - - val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) - val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream) - serInstance.deserializeStream(compressedStream) - } else { - // No more batches left - cleanup() - null - } - } - - /** - * Update partitionId if we have reached the end of our current partition, possibly skipping - * empty partitions on the way. - */ - private def skipToNextPartition() { - while (partitionId < numPartitions && - indexInPartition == spill.elementsPerPartition(partitionId)) { - partitionId += 1 - indexInPartition = 0L - } - } - - /** - * Return the next (K, C) pair from the deserialization stream and update partitionId, - * indexInPartition, indexInBatch and such to match its location. - * - * If the current batch is drained, construct a stream for the next batch and read from it. - * If no more pairs are left, return null. - */ - private def readNextItem(): (K, C) = { - if (finished || deserializeStream == null) { - return null - } - val k = deserializeStream.readKey().asInstanceOf[K] - val c = deserializeStream.readValue().asInstanceOf[C] - lastPartitionId = partitionId - // Start reading the next batch if we're done with this one - indexInBatch += 1 - if (indexInBatch == serializerBatchSize) { - indexInBatch = 0 - deserializeStream = nextBatchStream() - } - // Update the partition location of the element we're reading - indexInPartition += 1 - skipToNextPartition() - // If we've finished reading the last partition, remember that we're done - if (partitionId == numPartitions) { - finished = true - if (deserializeStream != null) { - deserializeStream.close() - } - } - (k, c) - } - - var nextPartitionToRead = 0 - - def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] { - val myPartition = nextPartitionToRead - nextPartitionToRead += 1 - - override def hasNext: Boolean = { - if (nextItem == null) { - nextItem = readNextItem() - if (nextItem == null) { - return false - } - } - assert(lastPartitionId >= myPartition) - // Check that we're still in the right partition; note that readNextItem will have returned - // null at EOF above so we would've returned false there - lastPartitionId == myPartition - } - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val item = nextItem - nextItem = null - item - } - } - - // Clean up our open streams and put us in a state where we can't read any more data - def cleanup() { - batchId = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop(). - // This should also be fixed in ExternalAppendOnlyMap. - } - } - - /** - * Return an iterator over all the data written to this object, grouped by partition and - * aggregated by the requested aggregator. For each partition we then have an iterator over its - * contents, and these are expected to be accessed in order (you can't "skip ahead" to one - * partition without reading the previous one). Guaranteed to return a key-value pair for each - * partition, in order of partition ID. + * Return an iterator over all the data written to this object, grouped by partition. + * For each partition we then have an iterator over its contents, and these are expected to be + * accessed in order (you can't "skip ahead" to one partition without reading the previous one). + * Guaranteed to return a key-value pair for each partition, in order of partition ID. * * For now, we just merge all the spilled files in once pass, but this can be modified to * support hierarchical merging. */ @VisibleForTesting - def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { + override def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val collection: WritablePartitionedPairCollection[K, C] = buffer - if (spills.isEmpty) { - // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps - // we don't even need to sort by anything other than partition ID - if (!ordering.isDefined) { - // The user hasn't requested sorted keys, so only sort by partition ID, not key - groupByPartition(collection.partitionedDestructiveSortedIterator(None)) - } else { - // We do need to sort by both partition ID and key - groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator))) - } - } else { - // Merge spilled and in-memory data - merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) - } + partitionedIterator(collection) } - /** - * Return an iterator over all the data written to this object, aggregated by our aggregator. - */ - def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) - /** * Write all the data added into this ExternalSorter into a file in the disk store. This is * called by the SortShuffleWriter. @@ -624,80 +168,6 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( context: TaskContext, outputFile: File): Array[Long] = { - // Track location of each range in the output file - val lengths = new Array[Long](numPartitions) - - if (spills.isEmpty) { - // Case where we only have in-memory data - val collection = buffer - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) - while (it.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) - val partitionId = it.nextPartition() - while (it.hasNext && it.nextPartition() == partitionId) { - it.writeNext(writer) - } - writer.commitAndClose() - val segment = writer.fileSegment() - lengths(partitionId) = segment.length - } - } else { - // We must perform merge-sort; get an iterator by partition and write everything directly. - for ((id, elements) <- this.partitionedIterator) { - if (elements.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) - for (elem <- elements) { - writer.write(elem._1, elem._2) - } - writer.commitAndClose() - val segment = writer.fileSegment() - lengths(id) = segment.length - } - } - } - - context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) - - lengths - } - - def stop(): Unit = { - spills.foreach(s => s.file.delete()) - spills.clear() - } - - /** - * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, - * group together the pairs for each partition into a sub-iterator. - * - * @param data an iterator of elements, assumed to already be sorted by partition ID - */ - private def groupByPartition(data: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = - { - val buffered = data.buffered - (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered))) - } - - /** - * An iterator that reads only the elements for a given partition ID from an underlying buffered - * stream, assuming this partition is the next one to be read. Used to make it easier to return - * partitioned iterators from our in-memory collection. - */ - private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) - extends Iterator[Product2[K, C]] - { - override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId - - override def next(): Product2[K, C] = { - if (!hasNext) { - throw new NoSuchElementException - } - val elem = data.next() - (elem._1._2, elem._2) - } + writePartitionedFile(blockId, context, outputFile, buffer) } } From b74ca3083d1fc2fe0c8c36cebab86539ca565ce1 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 12:22:17 -0400 Subject: [PATCH 03/14] Compilation fixes --- .../org/apache/spark/util/collection/ExternalSorter.scala | 6 +++--- .../apache/spark/util/collection/ExternalSorterAgg.scala | 2 +- .../apache/spark/util/collection/ExternalSorterNoAgg.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 31cced75611bd..932586b426348 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -20,6 +20,9 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import com.google.common.annotations.VisibleForTesting import com.google.common.io.ByteStreams import org.apache.spark._ @@ -28,9 +31,6 @@ import org.apache.spark.serializer._ import org.apache.spark.shuffle.sort.SortShuffleFileWriter import org.apache.spark.storage.{BlockId, BlockObjectWriter} -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala index b0ef94ee15626..2878b168953e9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala @@ -86,7 +86,7 @@ private[spark] class ExternalSorterAgg[K, V, C]( partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) - extends ExternalSorterParent[K,V,C] + extends ExternalSorter[K,V,C] with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala index fa4b178aea661..236b634d88ae2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala @@ -84,7 +84,7 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) - extends ExternalSorterParent[K, V, C](partitioner, ordering, serializer) + extends ExternalSorter[K, V, C](partitioner, ordering, serializer) with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { From 864d60380190d861ec1be6671f614b5e0981bf2c Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 12:29:19 -0400 Subject: [PATCH 04/14] Spacing --- .../apache/spark/util/collection/ExternalSorter.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 932586b426348..2e530d7ac3e3b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -285,13 +285,13 @@ private[spark] abstract class ExternalSorter[K, V, C]( * This interface abstracts away aggregator dependence */ protected def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] + : Iterator[(Int, Iterator[Product2[K, C]])] /** * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. */ protected def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) - : Iterator[Product2[K, C]] = + : Iterator[Product2[K, C]] = { val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) type Iter = BufferedIterator[Product2[K, C]] @@ -327,8 +327,7 @@ private[spark] abstract class ExternalSorter[K, V, C]( iterators: Seq[Iterator[Product2[K, C]]], mergeCombiners: (C, C) => C, comparator: Comparator[K], - totalOrder: Boolean) - : Iterator[Product2[K, C]] = + totalOrder: Boolean): Iterator[Product2[K, C]] = { if (!totalOrder) { // We only have a partial ordering, e.g. comparing the keys by hash code, which means that @@ -464,7 +463,7 @@ private[spark] abstract class ExternalSorter[K, V, C]( */ private def skipToNextPartition() { while (partitionId < numPartitions && - indexInPartition == spill.elementsPerPartition(partitionId)) { + indexInPartition == spill.elementsPerPartition(partitionId)) { partitionId += 1 indexInPartition = 0L } @@ -654,7 +653,7 @@ private[spark] abstract class ExternalSorter[K, V, C]( * @param data an iterator of elements, assumed to already be sorted by partition ID */ protected def groupByPartition(data: Iterator[((Int, K), C)]) - : Iterator[(Int, Iterator[Product2[K, C]])] = + : Iterator[(Int, Iterator[Product2[K, C]])] = { val buffered = data.buffered (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered))) From 8b3aca5abae42590b7796594caeaaae5caa80d28 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 12:36:26 -0400 Subject: [PATCH 05/14] Style fixes --- .../util/collection/ExternalSorter.scala | 36 +++++++++---------- .../util/collection/ExternalSorterAgg.scala | 16 ++++----- .../util/collection/ExternalSorterNoAgg.scala | 12 +++---- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2e530d7ac3e3b..f12518c762a0c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -86,9 +86,9 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * - Users are expected to call stop() at the end to delete all the intermediate files. */ private[spark] abstract class ExternalSorter[K, V, C]( - partitioner: Option[Partitioner] = None, - ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) + partitioner: Option[Partitioner] = None, + ordering: Option[Ordering[K]] = None, + serializer: Option[Serializer] = None) extends Logging with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { @@ -324,10 +324,10 @@ private[spark] abstract class ExternalSorter[K, V, C]( * they're not), we still merge them by doing equality tests for all keys that compare as equal. */ protected def mergeWithAggregation( - iterators: Seq[Iterator[Product2[K, C]]], - mergeCombiners: (C, C) => C, - comparator: Comparator[K], - totalOrder: Boolean): Iterator[Product2[K, C]] = + iterators: Seq[Iterator[Product2[K, C]]], + mergeCombiners: (C, C) => C, + comparator: Comparator[K], + totalOrder: Boolean): Iterator[Product2[K, C]] = { if (!totalOrder) { // We only have a partial ordering, e.g. comparing the keys by hash code, which means that @@ -558,8 +558,8 @@ private[spark] abstract class ExternalSorter[K, V, C]( @VisibleForTesting def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] - protected def partitionedIterator(collection: WritablePartitionedPairCollection[K, C]): - Iterator[(Int, Iterator[Product2[K, C]])] = { + protected def partitionedIterator(collection: WritablePartitionedPairCollection[K, C]) + : Iterator[(Int, Iterator[Product2[K, C]])] = { if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID @@ -592,15 +592,15 @@ private[spark] abstract class ExternalSorter[K, V, C]( * This interface abstracts away aggregator dependence. */ override def writePartitionedFile( - blockId: BlockId, - context: TaskContext, - outputFile: File): Array[Long] + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] protected def writePartitionedFile( - blockId: BlockId, - context: TaskContext, - outputFile: File, - collection: WritablePartitionedPairCollection[K, C]): Array[Long] = { + blockId: BlockId, + context: TaskContext, + outputFile: File, + collection: WritablePartitionedPairCollection[K, C]): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) @@ -664,8 +664,8 @@ private[spark] abstract class ExternalSorter[K, V, C]( * stream, assuming this partition is the next one to be read. Used to make it easier to return * partitioned iterators from our in-memory collection. */ - protected[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) - extends Iterator[Product2[K, C]] + protected[this] class IteratorForPartition(partitionId: Int, + data: BufferedIterator[((Int, K), C)]) extends Iterator[Product2[K, C]] { override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala index 2878b168953e9..7a914e1b47db7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala @@ -82,11 +82,11 @@ import org.apache.spark.storage.BlockId * - Users are expected to call stop() at the end to delete all the intermediate files. */ private[spark] class ExternalSorterAgg[K, V, C]( - aggregator: Aggregator[K, V, C], - partitioner: Option[Partitioner] = None, - ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) - extends ExternalSorter[K,V,C] + aggregator: Aggregator[K, V, C], + partitioner: Option[Partitioner] = None, + ordering: Option[Ordering[K]] = None, + serializer: Option[Serializer] = None) + extends ExternalSorter[K, V, C] with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { @@ -165,9 +165,9 @@ private[spark] class ExternalSorterAgg[K, V, C]( * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ override def writePartitionedFile( - blockId: BlockId, - context: TaskContext, - outputFile: File): Array[Long] = { + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] = { writePartitionedFile(blockId, context, outputFile, map) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala index 236b634d88ae2..ea6e5a261d127 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala @@ -81,9 +81,9 @@ import org.apache.spark.storage.BlockId * - Users are expected to call stop() at the end to delete all the intermediate files. */ private[spark] class ExternalSorterNoAgg[K, V, C]( - partitioner: Option[Partitioner] = None, - ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) + partitioner: Option[Partitioner] = None, + ordering: Option[Ordering[K]] = None, + serializer: Option[Serializer] = None) extends ExternalSorter[K, V, C](partitioner, ordering, serializer) with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { @@ -164,9 +164,9 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ override def writePartitionedFile( - blockId: BlockId, - context: TaskContext, - outputFile: File): Array[Long] = { + blockId: BlockId, + context: TaskContext, + outputFile: File): Array[Long] = { writePartitionedFile(blockId, context, outputFile, buffer) } From 72b87e50113b7cbf13730a69d606e9d2ad90e3d9 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 13:12:07 -0400 Subject: [PATCH 06/14] Updated creation of ExternalSorter instances to use aggregator//no-aggregator interfaces --- .../shuffle/hash/HashShuffleReader.scala | 6 +- .../shuffle/sort/SortShuffleWriter.scala | 11 ++- .../util/collection/ExternalSorterSuite.scala | 81 +++++++++---------- .../spark/sql/execution/basicOperators.scala | 7 +- 4 files changed, 51 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index d5c9880659dd3..815f5790ace01 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -17,12 +17,12 @@ package org.apache.spark.shuffle.hash -import org.apache.spark.{InterruptibleIterator, MapOutputTracker, SparkEnv, TaskContext} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader} import org.apache.spark.storage.BlockManager import org.apache.spark.util.CompletionIterator -import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.util.collection.ExternalSorterNoAgg +import org.apache.spark.{InterruptibleIterator, MapOutputTracker, SparkEnv, TaskContext} private[spark] class HashShuffleReader[K, C]( handle: BaseShuffleHandle[K, _, C], @@ -93,7 +93,7 @@ private[spark] class HashShuffleReader[K, C]( case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. - val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) + val sorter = new ExternalSorterNoAgg[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 5865e7640c1cf..fd494734d836f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -21,9 +21,9 @@ import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter} import org.apache.spark.storage.ShuffleBlockId -import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.util.collection.{ExternalSorterAgg, ExternalSorterNoAgg} private[spark] class SortShuffleWriter[K, V, C]( shuffleBlockResolver: IndexShuffleBlockResolver, @@ -52,8 +52,8 @@ private[spark] class SortShuffleWriter[K, V, C]( override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") - new ExternalSorter[K, V, C]( - dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) + new ExternalSorterAgg[K, V, C]( + dep.aggregator.get, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else if (SortShuffleWriter.shouldBypassMergeSort( SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, keyOrdering = None)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't @@ -67,8 +67,7 @@ private[spark] class SortShuffleWriter[K, V, C]( // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. - new ExternalSorter[K, V, V]( - aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) + new ExternalSorterNoAgg[K, V, V](Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 9cefa612f5491..9a8ea507b4e27 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -58,26 +58,26 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] // Both aggregator and ordering - val sorter = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(3)), Some(ord), None) + val sorter = new ExternalSorterAgg[Int, Int, Int]( + agg, Some(new HashPartitioner(3)), Some(ord), None) assert(sorter.iterator.toSeq === Seq()) sorter.stop() // Only aggregator - val sorter2 = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(3)), None, None) + val sorter2 = new ExternalSorterAgg[Int, Int, Int]( + agg, Some(new HashPartitioner(3)), None, None) assert(sorter2.iterator.toSeq === Seq()) sorter2.stop() // Only ordering - val sorter3 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) + val sorter3 = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), Some(ord), None) assert(sorter3.iterator.toSeq === Seq()) sorter3.stop() // Neither aggregator nor ordering - val sorter4 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), None, None) + val sorter4 = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), None, None) assert(sorter4.iterator.toSeq === Seq()) sorter4.stop() } @@ -103,29 +103,28 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { (5, Set((5, 5))), (6, Set())) // Both aggregator and ordering - val sorter = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(7)), Some(ord), None) + val sorter = new ExternalSorterAgg[Int, Int, Int]( + agg, Some(new HashPartitioner(7)), Some(ord), None) sorter.insertAll(elements.iterator) assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter.stop() // Only aggregator - val sorter2 = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(7)), None, None) + val sorter2 = new ExternalSorterAgg[Int, Int, Int]( + agg, Some(new HashPartitioner(7)), None, None) sorter2.insertAll(elements.iterator) assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter2.stop() // Only ordering - val sorter3 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), Some(ord), None) + val sorter3 = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(7)), Some(ord), None) sorter3.insertAll(elements.iterator) assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter3.stop() // Neither aggregator nor ordering - val sorter4 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), None, None) + val sorter4 = new ExternalSorterNoAgg[Int, Int, Int](Some(new HashPartitioner(7)), None, None) sorter4.insertAll(elements.iterator) assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter4.stop() @@ -148,8 +147,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), Some(ord), None) + val sorter = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(7)), Some(ord), None) sorter.insertAll(elements) assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) @@ -330,15 +329,15 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) + val sorter = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), Some(ord), None) sorter.insertAll((0 until 120000).iterator.map(i => (i, i))) assert(diskBlockManager.getAllFiles().length > 0) sorter.stop() assert(diskBlockManager.getAllBlocks().length === 0) - val sorter2 = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) + val sorter2 = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), Some(ord), None) sorter2.insertAll((0 until 120000).iterator.map(i => (i, i))) assert(diskBlockManager.getAllFiles().length > 0) assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet) @@ -355,8 +354,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) + val sorter = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), Some(ord), None) intercept[SparkException] { sorter.insertAll((0 until 120000).iterator.map(i => { if (i == 119990) { @@ -420,7 +419,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) + val sorter = new ExternalSorterNoAgg[Int, Int, Int](Some(new HashPartitioner(3)), None, None) sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { @@ -443,7 +442,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) + val sorter = new ExternalSorterAgg(agg, Some(new HashPartitioner(3)), None, None) sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { @@ -466,7 +465,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) - val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) + val sorter = new ExternalSorterAgg(agg, Some(new HashPartitioner(3)), None, None) sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { @@ -491,7 +490,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None) + val sorter = new ExternalSorterAgg(agg, Some(new HashPartitioner(3)), Some(ord), None) // avoid combine before spill sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i))) @@ -517,8 +516,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) + val sorter = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), Some(ord), None) sorter.insertAll((0 until 100).iterator.map(i => (i, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq val expected = (0 until 3).map(p => { @@ -541,8 +540,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val ord = implicitly[Ordering[Int]] - val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(3)), Some(ord), None) + val sorter = new ExternalSorterNoAgg[Int, Int, Int]( + Some(new HashPartitioner(3)), Some(ord), None) sorter.insertAll((0 until 100000).iterator.map(i => (i, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq val expected = (0 until 3).map(p => { @@ -564,8 +563,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[String, String, ArrayBuffer[String]]( createCombiner _, mergeValue _, mergeCombiners _) - val sorter = new ExternalSorter[String, String, ArrayBuffer[String]]( - Some(agg), None, None, None) + val sorter = new ExternalSorterAgg[String, String, ArrayBuffer[String]]( + agg, None, None, None) val collisionPairs = Seq( ("Aa", "BB"), // 2112 @@ -614,7 +613,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local-cluster[1,1,512]", "test", conf) val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) - val sorter = new ExternalSorter[FixedHashObject, Int, Int](Some(agg), None, None, None) + val sorter = new ExternalSorterAgg[FixedHashObject, Int, Int](agg, None, None, None) // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes // problems if the map fails to group together the objects with the same code (SPARK-2043). @@ -643,7 +642,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) - val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None) + val sorter = new ExternalSorterAgg[Int, Int, ArrayBuffer[Int]](agg, None, None, None) sorter.insertAll( (1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) @@ -668,8 +667,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[String, String, ArrayBuffer[String]]( createCombiner, mergeValue, mergeCombiners) - val sorter = new ExternalSorter[String, String, ArrayBuffer[String]]( - Some(agg), None, None, None) + val sorter = new ExternalSorterAgg[String, String, ArrayBuffer[String]]( + agg, None, None, None) sorter.insertAll((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator( (null.asInstanceOf[String], "1"), @@ -709,8 +708,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val testData = Array.tabulate(100000) { _ => rand.nextInt().toString } - val sorter1 = new ExternalSorter[String, String, String]( - None, None, Some(wrongOrdering), None) + val sorter1 = new ExternalSorterNoAgg[String, String, String](None, Some(wrongOrdering), None) val thrown = intercept[IllegalArgumentException] { sorter1.insertAll(testData.iterator.map(i => (i, i))) sorter1.iterator @@ -730,8 +728,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val agg = new Aggregator[String, String, ArrayBuffer[String]]( createCombiner, mergeValue, mergeCombiners) - val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]]( - Some(agg), None, None, None) + val sorter2 = new ExternalSorterAgg[String, String, ArrayBuffer[String]](agg, None, None, None) sorter2.insertAll(testData.iterator.map(i => (i, i))) // To validate the hash ordering of key diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 647c4ab5cb651..068c885296e35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.util.collection.ExternalSorterNoAgg import org.apache.spark.util.{CompletionIterator, MutablePair} import org.apache.spark.{HashPartitioner, SparkEnv} @@ -232,7 +232,8 @@ case class ExternalSort( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) - val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering)) + val sorter = + new ExternalSorterNoAgg[InternalRow, Null, InternalRow](ordering = Some(ordering)) sorter.insertAll(iterator.map(r => (r.copy, null))) val baseIterator = sorter.iterator.map(_._1) // TODO(marmbrus): The complex type signature below thwarts inference for no reason. From 083b2b6682ab237a49238ecfb919fa8c8fe2663c Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 13:24:24 -0400 Subject: [PATCH 07/14] Style --- .../org/apache/spark/shuffle/hash/HashShuffleReader.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 815f5790ace01..4a4b9652ad9bc 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -93,7 +93,8 @@ private[spark] class HashShuffleReader[K, C]( case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. - val sorter = new ExternalSorterNoAgg[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) + val sorter = + new ExternalSorterNoAgg[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) From 36db0dce3a17c460bd6e7dca2f9edc35fbe3f2ab Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 30 Jun 2015 14:19:30 -0400 Subject: [PATCH 08/14] Added MIMA exclude --- project/MimaExcludes.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6f86a505b3ae4..610ac37d292cf 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -74,7 +74,11 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.parquet.ParquetTypeInfo"), ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.sql.parquet.ParquetTypeInfo$") + "org.apache.spark.sql.parquet.ParquetTypeInfo$"), + // SPARK-8464 Separating aggregator and non-aggregator paths in ExternalSorter + ProblemFilters.exclude[AbstractClassProblem] + ("org.apache.spark.util.collection.ExternalSorter") + ) case v if v.startsWith("1.4") => Seq( From 1a8dba50c049bde6101b7604935683cc012a2fb5 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 1 Jul 2015 07:37:04 -0400 Subject: [PATCH 09/14] Fixed error in constructor --- .../org/apache/spark/util/collection/ExternalSorter.scala | 2 +- .../org/apache/spark/util/collection/ExternalSorterAgg.scala | 2 +- .../apache/spark/util/collection/ExternalSorterNoAgg.scala | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index f12518c762a0c..ba2ae521ef889 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -591,7 +591,7 @@ private[spark] abstract class ExternalSorter[K, V, C]( * * This interface abstracts away aggregator dependence. */ - override def writePartitionedFile( + def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala index 7a914e1b47db7..b55ae636c2662 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterAgg.scala @@ -86,7 +86,7 @@ private[spark] class ExternalSorterAgg[K, V, C]( partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) - extends ExternalSorter[K, V, C] + extends ExternalSorter[K, V, C](partitioner, ordering, serializer) with Spillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala index ea6e5a261d127..ae0a7aab47491 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorterNoAgg.scala @@ -151,8 +151,7 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( */ @VisibleForTesting override def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { - val collection: WritablePartitionedPairCollection[K, C] = buffer - partitionedIterator(collection) + partitionedIterator(buffer) } /** @@ -167,7 +166,6 @@ private[spark] class ExternalSorterNoAgg[K, V, C]( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { - writePartitionedFile(blockId, context, outputFile, buffer) } } From 61a4c8ec2ecce73ed0bb2d89c294075dd4221cde Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 8 Jul 2015 14:54:30 -0700 Subject: [PATCH 10/14] Fixes to MimaExcludes --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 44306630ca4d7..e06feae4a2a67 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -63,7 +63,7 @@ object MimaExcludes { // NanoTime and CatalystTimestampConverter is only used inside catalyst, // not needed anymore ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.sql.parquet.timestamp.NanoTime"), + "org.apache.spark.sql.parquet.timestamp.NanoTime"),` ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.parquet.timestamp.NanoTime$"), ProblemFilters.exclude[MissingClassProblem]( From 1c154c4e6d64a00f3e20ec0c930e3cfc80f4c584 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 8 Jul 2015 15:44:08 -0700 Subject: [PATCH 11/14] style fix --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e06feae4a2a67..44306630ca4d7 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -63,7 +63,7 @@ object MimaExcludes { // NanoTime and CatalystTimestampConverter is only used inside catalyst, // not needed anymore ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.sql.parquet.timestamp.NanoTime"),` + "org.apache.spark.sql.parquet.timestamp.NanoTime"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.parquet.timestamp.NanoTime$"), ProblemFilters.exclude[MissingClassProblem]( From 5e5d5e7121e54af7a7c06701553be778efb12362 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 8 Jul 2015 16:16:28 -0700 Subject: [PATCH 12/14] Fixing merge conflict --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index aec6b46f97336..89f52b18f7f06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.Row import org.apache.spark.util.collection.ExternalSorterNoAgg import org.apache.spark.util.{CompletionIterator, MutablePair} import org.apache.spark.{HashPartitioner, SparkEnv} From d2fe6b045c4b89a555d98fc289f6a380f8bd2b88 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 31 Jul 2015 09:11:00 -0700 Subject: [PATCH 13/14] Import ordering and spacing/ --- .../spark/sql/execution/basicOperators.scala | 7 +------ .../org/apache/spark/sql/execution/sort.scala | 19 +++++++------------ 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index d153c275c9014..2101b34c91b3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -22,14 +22,10 @@ import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.Row -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.ExternalSorterNoAgg -import org.apache.spark.util.collection.unsafe.sort.PrefixComparator -import org.apache.spark.util.{CompletionIterator, MutablePair} +import org.apache.spark.util.MutablePair import org.apache.spark.{HashPartitioner, SparkEnv} /** @@ -234,7 +230,6 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) } } - /** * :: DeveloperApi :: * Returns a table with the elements from left that are not in right using diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 81f38a41defcf..144b873d01bbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -17,11 +17,10 @@ package org.apache.spark.sql.execution -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.sql.types.StructType import org.apache.spark.util.CompletionIterator @@ -33,16 +32,14 @@ import org.apache.spark.util.collection.ExternalSorterNoAgg /** -* :: DeveloperApi :: * Performs a sort on-heap. * @param global when true performs a global sort of all partitions by shuffling the data first * if necessary. */ -@DeveloperApi case class Sort( - sortOrder: Seq[SortOrder], - global: Boolean, - child: SparkPlan) + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan) extends UnaryNode { override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil @@ -60,16 +57,14 @@ case class Sort( } /** - * :: DeveloperApi :: * Performs a sort, spilling to disk as needed. * @param global when true performs a global sort of all partitions by shuffling the data first * if necessary. */ -@DeveloperApi case class ExternalSort( - sortOrder: Seq[SortOrder], - global: Boolean, - child: SparkPlan) + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan) extends UnaryNode { override def requiredChildDistribution: Seq[Distribution] = From bf63fce1b5bb88549d8a43ad8e0d9b2c2cc30c8f Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 31 Jul 2015 09:11:32 -0700 Subject: [PATCH 14/14] Spacing --- .../main/scala/org/apache/spark/sql/execution/sort.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 144b873d01bbd..138c467110a99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -32,10 +32,10 @@ import org.apache.spark.util.collection.ExternalSorterNoAgg /** -* Performs a sort on-heap. -* @param global when true performs a global sort of all partitions by shuffling the data first -* if necessary. -*/ + * Performs a sort on-heap. + * @param global when true performs a global sort of all partitions by shuffling the data first + * if necessary. + */ case class Sort( sortOrder: Seq[SortOrder], global: Boolean,