Skip to content

Commit

Permalink
[SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp …
Browse files Browse the repository at this point in the history
…files

## What changes were proposed in this pull request?

Prior to this commit getAllBlocks implicitly assumed that the directories
managed by the DiskBlockManager contain only the files corresponding to
valid block IDs. In reality, this assumption was violated during shuffle,
which produces temporary files in the same directory as the resulting
blocks. As a result, calls to getAllBlocks during shuffle were unreliable.

The fix could be made more efficient, but this is probably good enough.

## How was this patch tested?

`DiskBlockManagerSuite`

Author: Sergei Lebedev <s.lebedev@criteo.com>

Closes apache#19458 from superbobry/block-id-option.
  • Loading branch information
Sergei Lebedev authored and cloud-fan committed Oct 25, 2017
1 parent d212ef1 commit b377ef1
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi

/**
Expand Down Expand Up @@ -95,6 +96,10 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
override def name: String = "test_" + id
}

@DeveloperApi
class UnrecognizedBlockId(name: String)
extends SparkException(s"Failed to parse $name into a block ID")

@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
Expand All @@ -104,10 +109,11 @@ object BlockId {
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r

/** Converts a BlockId "name" String back into a BlockId. */
def apply(id: String): BlockId = id match {
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
Expand All @@ -122,9 +128,13 @@ object BlockId {
TaskResultBlockId(taskId.toLong)
case STREAM(streamId, uniqueId) =>
StreamBlockId(streamId.toInt, uniqueId.toLong)
case TEMP_LOCAL(uuid) =>
TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) =>
TempShuffleBlockId(UUID.fromString(uuid))
case TEST(value) =>
TestBlockId(value)
case _ =>
throw new IllegalStateException("Unrecognized BlockId: " + id)
throw new UnrecognizedBlockId(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

/** List all the blocks currently stored on disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
getAllFiles().map(f => BlockId(f.getName))
getAllFiles().flatMap { f =>
try {
Some(BlockId(f.getName))
} catch {
case _: UnrecognizedBlockId =>
// Skip files which do not correspond to blocks, for example temporary
// files created by [[SortShuffleWriter]].
None
}
}
}

/** Produces a unique block id and File suitable for storing local intermediate results. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,8 @@ class BlockIdSuite extends SparkFunSuite {
}

test("test-bad-deserialization") {
try {
// Try to deserialize an invalid block id.
intercept[UnrecognizedBlockId] {
BlockId("myblock")
fail()
} catch {
case e: IllegalStateException => // OK
case _: Throwable => fail()
}
}

Expand Down Expand Up @@ -139,6 +134,7 @@ class BlockIdSuite extends SparkFunSuite {
assert(id.id.getMostSignificantBits() === 5)
assert(id.id.getLeastSignificantBits() === 2)
assert(!id.isShuffle)
assertSame(id, BlockId(id.toString))
}

test("temp shuffle") {
Expand All @@ -151,6 +147,7 @@ class BlockIdSuite extends SparkFunSuite {
assert(id.id.getMostSignificantBits() === 1)
assert(id.id.getLeastSignificantBits() === 2)
assert(!id.isShuffle)
assertSame(id, BlockId(id.toString))
}

test("test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io.{File, FileWriter}
import java.util.UUID

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

Expand Down Expand Up @@ -79,6 +80,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
}

test("SPARK-22227: non-block files are skipped") {
val file = diskBlockManager.getFile("unmanaged_file")
writeToFile(file, 10)
assert(diskBlockManager.getAllBlocks().isEmpty)
}

def writeToFile(file: File, numBytes: Int) {
val writer = new FileWriter(file, true)
for (i <- 0 until numBytes) writer.write(i)
Expand Down

0 comments on commit b377ef1

Please sign in to comment.