Skip to content

Commit

Permalink
First pass at the feedback from @attilapiros, mostly minor re-arrange…
Browse files Browse the repository at this point in the history
…ment
  • Loading branch information
holdenk committed Jun 4, 2020
1 parent ecd1a14 commit 9c8836a
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class IndexShuffleBlockResolver(
val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs
val filenames = searchDirs.flatMap(_.list())
logDebug(s"Got block files ${filenames.toList}")
filenames.flatMap{ fname =>
filenames.flatMap { fname =>
pattern.findAllIn(fname).matchData.map {
matched => (matched.group(1).toInt, matched.group(2).toLong)
}
Expand Down Expand Up @@ -180,7 +180,7 @@ private[spark] class IndexShuffleBlockResolver(
StreamCallbackWithID = {
val file = blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
getIndexFile(shuffleId, mapId)
getIndexFile(shuffleId, mapId)
case ShuffleDataBlockId(shuffleId, mapId, _) =>
getDataFile(shuffleId, mapId)
case _ =>
Expand All @@ -203,7 +203,7 @@ private[spark] class IndexShuffleBlockResolver(
}

override def onComplete(streamId: String): Unit = {
logTrace(s"Done receiving block $blockId, now putting into local shuffle service")
logTrace(s"Done receiving shuffle block $blockId, now storing on local disk.")
channel.close()
val diskSize = fileTmp.length()
this.synchronized {
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ private[spark] class BlockManager(
shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver]
}

// Shuffles which are either in queue for migrations or migrated
private val migratingShuffles = mutable.HashSet[(Int, Long)]()
// Shuffles which are queued for migration
private val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()

/**
* Abstraction for storing blocks from bytes, whether they start in memory or on disk.
*
Expand Down Expand Up @@ -1812,13 +1817,6 @@ private[spark] class BlockManager(
}
}


// Shuffles which are either in queue for migrations or migrated
private val migratingShuffles = mutable.HashSet[(Int, Long)]()
// Shuffles which are queued for migration
private val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long)]()


private class ShuffleMigrationRunnable(peer: BlockManagerId) extends Runnable {
@volatile var running = true
override def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
eventually(timeout(15.seconds), interval(10.milliseconds)) {
if (persist) {
// One of our blocks should have moved.
val rddUpdates = blocksUpdated.filter{update =>
val rddUpdates = blocksUpdated.filter {update =>
val blockId = update.blockUpdatedInfo.blockId
blockId.isRDD}
val blockLocs = rddUpdates.map{ update =>
val blockLocs = rddUpdates.map { update =>
(update.blockUpdatedInfo.blockId.name,
update.blockUpdatedInfo.blockManagerId)}
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
Expand All @@ -201,11 +201,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
// If we're migrating shuffles we look for any shuffle block updates
// as there is no block update on the initial shuffle block write.
if (shuffle) {
val numDataLocs = blocksUpdated.filter{ update =>
val numDataLocs = blocksUpdated.filter { update =>
val blockId = update.blockUpdatedInfo.blockId
blockId.isInstanceOf[ShuffleDataBlockId]
}.size
val numIndexLocs = blocksUpdated.filter{ update =>
val numIndexLocs = blocksUpdated.filter { update =>
val blockId = update.blockUpdatedInfo.blockId
blockId.isInstanceOf[ShuffleIndexBlockId]
}.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,7 @@ class KubernetesSuite extends SparkFunSuite
val result = checkPodReady(namespace, name)
result shouldBe (true)
}
// Look for the string that indicates we're good to trigger decom
// on the driver
// Look for the string that indicates we're good to trigger decom on the driver
logDebug("Waiting for first collect...")
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
Expand Down

0 comments on commit 9c8836a

Please sign in to comment.