Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Aug 7, 2014
1 parent 398cb95 commit 10233af
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 23 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ object SparkEnv extends Logging {

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"HASH" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"SORT" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "HASH")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName, shuffleMgrName)
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ private[spark] class SortShuffleWriter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we
// don't care whether the keys get sorted in each partition; that will be done on the
// reduce side if the operation being run is sortByKey.
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ private[spark] class ExternalSorter[K, V, C](
// spilled files, which would happen with the normal code path. The downside is having multiple
// files open at a time and thus more memory allocated to buffers.
private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
private[collection] val bypassMergeSort = // private[collection] for unit tests
private val bypassMergeSort =
(numPartitions <= bypassMergeThreshold && aggregator.isEmpty && ordering.isEmpty)

// Array of file writers for each partition, used if bypassMergeSort is true
// Array of file writers for each partition, used if bypassMergeSort is true and we've spilled
private var partitionWriters: Array[BlockObjectWriter] = null

// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package org.apache.spark.util.collection

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite
import org.scalatest.{PrivateMethodTester, FunSuite}

import org.apache.spark._
import org.apache.spark.SparkContext._

class ExternalSorterSuite extends FunSuite with LocalSparkContext {
class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMethodTester {
private def createSparkConf(loadDefaults: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
Expand All @@ -36,6 +36,16 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
conf
}

private def assertBypassedMergeSort(sorter: ExternalSorter[_, _, _]): Unit = {
val bypassMergeSort = PrivateMethod[Boolean]('bypassMergeSort)
assert(sorter.invokePrivate(bypassMergeSort()), "sorter did not bypass merge-sort")
}

private def assertDidNotBypassMergeSort(sorter: ExternalSorter[_, _, _]): Unit = {
val bypassMergeSort = PrivateMethod[Boolean]('bypassMergeSort)
assert(!sorter.invokePrivate(bypassMergeSort()), "sorter bypassed merge-sort")
}

test("empty data stream") {
val conf = new SparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
Expand Down Expand Up @@ -123,7 +133,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(7)), Some(ord), None)
assert(!sorter.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter)
sorter.insertAll(elements)
assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled
val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
Expand All @@ -147,7 +157,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(7)), None, None)
assert(sorter.bypassMergeSort, "sorter did not bypass merge-sort")
assertBypassedMergeSort(sorter)
sorter.insertAll(elements)
assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled
val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
Expand Down Expand Up @@ -314,15 +324,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
assert(!sorter.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter)
sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
sorter.stop()
assert(diskBlockManager.getAllBlocks().length === 0)

val sorter2 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
assert(!sorter2.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter2)
sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
Expand All @@ -338,14 +348,14 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
assert(sorter.bypassMergeSort, "sorter did not bypass merge-sort")
assertBypassedMergeSort(sorter)
sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
sorter.stop()
assert(diskBlockManager.getAllBlocks().length === 0)

val sorter2 = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
assert(sorter2.bypassMergeSort, "sorter did not bypass merge-sort")
assertBypassedMergeSort(sorter2)
sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
Expand All @@ -364,7 +374,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
assert(!sorter.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter)
intercept[SparkException] {
sorter.insertAll((0 until 100000).iterator.map(i => {
if (i == 99990) {
Expand All @@ -386,7 +396,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
assert(sorter.bypassMergeSort, "sorter did not bypass merge-sort")
assertBypassedMergeSort(sorter)
intercept[SparkException] {
sorter.insertAll((0 until 100000).iterator.map(i => {
if (i == 99990) {
Expand Down Expand Up @@ -681,20 +691,20 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter1 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(FEW_PARTITIONS)), None, None)
assert(sorter1.bypassMergeSort, "sorter did not bypass merge-sort")
assertBypassedMergeSort(sorter1)

val sorter2 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(MANY_PARTITIONS)), None, None)
assert(!sorter2.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter2)

// Sorters with an ordering or aggregator: should not bypass even if they have few partitions

val sorter3 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(FEW_PARTITIONS)), Some(ord), None)
assert(!sorter3.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter3)

val sorter4 = new ExternalSorter[Int, Int, Int](
Some(agg), Some(new HashPartitioner(FEW_PARTITIONS)), None, None)
assert(!sorter4.bypassMergeSort, "sorter bypassed merge-sort")
assertDidNotBypassMergeSort(sorter4)
}
}

0 comments on commit 10233af

Please sign in to comment.