Skip to content

Commit

Permalink
[SPARK-22540][SQL] Ensure HighlyCompressedMapStatus calculates correc…
Browse files Browse the repository at this point in the history
…t avgSize

## What changes were proposed in this pull request?

Ensure HighlyCompressedMapStatus calculates correct avgSize

## How was this patch tested?

New unit test added.

Author: yucai <yucai.yu@intel.com>

Closes #19765 from yucai/avgsize.

(cherry picked from commit d00b55d)
Signed-off-by: Sean Owen <sowen@cloudera.com>
  • Loading branch information
yucai authored and srowen committed Nov 17, 2017
1 parent be68f86 commit ef7ccc1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ private[spark] object HighlyCompressedMapStatus {
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
var numNonEmptyBlocks: Int = 0
var totalSize: Long = 0
var numSmallBlocks: Int = 0
var totalSmallBlockSize: Long = 0
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
Expand All @@ -214,7 +215,8 @@ private[spark] object HighlyCompressedMapStatus {
// Huge blocks are not included in the calculation for average size, thus size for smaller
// blocks is more accurate.
if (size < threshold) {
totalSize += size
totalSmallBlockSize += size
numSmallBlocks += 1
} else {
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
}
Expand All @@ -223,8 +225,8 @@ private[spark] object HighlyCompressedMapStatus {
}
i += 1
}
val avgSize = if (numNonEmptyBlocks > 0) {
totalSize / numNonEmptyBlocks
val avgSize = if (numSmallBlocks > 0) {
totalSmallBlockSize / numSmallBlocks
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ class MapStatusSuite extends SparkFunSuite {
}
}

test("SPARK-22540: ensure HighlyCompressedMapStatus calculates correct avgSize") {
val threshold = 1000
val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, threshold.toString)
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
val sizes = (0L to 3000L).toArray
val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold)
val avg = smallBlockSizes.sum / smallBlockSizes.length
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
val status1 = compressAndDecompressMapStatus(status)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
assert(status1.location == loc)
for (i <- 0 until threshold) {
val estimate = status1.getSizeForBlock(i)
if (sizes(i) > 0) {
assert(estimate === avg)
}
}
}

def compressAndDecompressMapStatus(status: MapStatus): MapStatus = {
val ser = new JavaSerializer(new SparkConf)
val buf = ser.newInstance().serialize(status)
Expand Down

0 comments on commit ef7ccc1

Please sign in to comment.