Skip to content

Commit

Permalink
[SPARK-33198][CORE] getMigrationBlocks should not fail at missing files
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to fix `getMigrationBlocks` error handling and to add test coverage.
1. `getMigrationBlocks` should not fail at indexFile only case.
2. `assert` causes `java.lang.AssertionError` which is not an `Exception`.

### Why are the changes needed?

To handle the exception correctly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CI with the newly added test case.

Closes #30110 from dongjoon-hyun/SPARK-33198.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
dongjoon-hyun committed Oct 20, 2020
1 parent c824db2 commit 385d5db
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
Expand Up @@ -241,14 +241,19 @@ private[spark] class IndexShuffleBlockResolver(
val dataBlockData = new FileSegmentManagedBuffer(
transportConf, dataFile, 0, dataFile.length())

// Make sure the files exist
assert(indexFile.exists() && dataFile.exists())

List((indexBlockId, indexBlockData), (dataBlockId, dataBlockData))
// Make sure the index exist.
if (!indexFile.exists()) {
throw new FileNotFoundException("Index file is deleted already.")
}
if (dataFile.exists()) {
List((indexBlockId, indexBlockData), (dataBlockId, dataBlockData))
} else {
List((indexBlockId, indexBlockData))
}
} catch {
case e: Exception => // If we can't load the blocks ignore them.
logWarning(s"Failed to resolve shuffle block ${shuffleBlockInfo}, skipping migration" +
"this is expected to occure if a block is removed after decommissioning has started.")
case _: Exception => // If we can't load the blocks ignore them.
logWarning(s"Failed to resolve shuffle block ${shuffleBlockInfo}, skipping migration. " +
"This is expected to occur if a block is removed after decommissioning has started.")
List.empty[(BlockId, ManagedBuffer)]
}
}
Expand Down
Expand Up @@ -156,4 +156,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
indexIn2.close()
}
}

test("SPARK-33198 getMigrationBlocks should not fail at missing files") {
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
assert(resolver.getMigrationBlocks(ShuffleBlockInfo(Int.MaxValue, Long.MaxValue)).isEmpty)
}
}

0 comments on commit 385d5db

Please sign in to comment.