Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public void initializeExecutor(String appId, String execId, Map<String, String>
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the SparkEnv.");
}
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
blockResolver =
new IndexShuffleBlockResolver(sparkConf, blockManager, Map.of() /* Shouldn't be accessed */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
import java.util.{Map => JMap}

import scala.collection.mutable.ArrayBuffer

Expand All @@ -40,6 +41,7 @@ import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.OpenHashSet

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
Expand All @@ -55,7 +57,8 @@ import org.apache.spark.util.Utils
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
var _blockManager: BlockManager = null)
var _blockManager: BlockManager = null,
val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = JMap.of())
extends ShuffleBlockResolver
with Logging with MigratableResolver {

Expand Down Expand Up @@ -285,6 +288,21 @@ private[spark] class IndexShuffleBlockResolver(
throw SparkCoreErrors.failedRenameTempFileError(fileTmp, file)
}
}
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
shuffleId, _ => new OpenHashSet[Long](8)
)
mapTaskIds.add(mapId)

case ShuffleDataBlockId(shuffleId, mapId, _) =>
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
shuffleId, _ => new OpenHashSet[Long](8)
)
mapTaskIds.add(mapId)

case _ => // Unreachable
}
blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager

private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
override val shuffleBlockResolver =
new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = taskIdMapsForShuffle)

/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ public void writeWithoutSpilling() throws Exception {

@Test
public void writeChecksumFileWithoutSpill() throws Exception {
IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager);
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Map.of());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down Expand Up @@ -344,7 +345,8 @@ public void writeChecksumFileWithoutSpill() throws Exception {

@Test
public void writeChecksumFileWithSpill() throws Exception {
IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager);
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Map.of());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.storage

import java.io.File
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

import org.apache.commons.io.FileUtils
import org.scalatest.concurrent.Eventually

import org.apache.spark._
Expand Down Expand Up @@ -353,4 +355,78 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}

test("SPARK-46957: Migrated shuffle files should be able to cleanup from executor") {

val sparkTempDir = System.getProperty("java.io.tmpdir")

def shuffleFiles: Seq[File] = {
FileUtils
.listFiles(new File(sparkTempDir), Array("data", "index"), true)
.asScala
.toSeq
}

val existingShuffleFiles = shuffleFiles

val conf = new SparkConf()
.setAppName("SPARK-46957")
.setMaster("local-cluster[2,1,1024]")
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
var isDecommissionedExecutorRemoved = false
val execToDecommission = sc.getExecutorIds().head
sc.addSparkListener(new SparkListener {
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
}
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
assert(execToDecommission === executorRemoved.executorId)
isDecommissionedExecutorRemoved = true
}
})

// Run a job to create shuffle data
val result = sc.parallelize(1 to 1000, 10)
.map { i => (i % 2, i) }
.reduceByKey(_ + _).collect()

assert(result.head === (0, 250500))
assert(result.tail.head === (1, 250000))
sc.schedulerBackend
.asInstanceOf[StandaloneSchedulerBackend]
.decommissionExecutor(
execToDecommission,
ExecutorDecommissionInfo("test", None),
adjustTargetNumExecutors = true
)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(isDecommissionedExecutorRemoved)
// Ensure there are shuffle data have been migrated
assert(shuffleBlockUpdates.size >= 2)
}

val shuffleId = shuffleBlockUpdates
.find(_.isInstanceOf[ShuffleIndexBlockId])
.map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
.get

val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
assert(newShuffleFiles.size >= shuffleBlockUpdates.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this seems to introduce a flakiness.

[info] - SPARK-46957: Migrated shuffle files should be able to cleanup from executor *** FAILED *** (36 seconds, 137 milliseconds)
[info]   0 was not greater than or equal to 6 (BlockManagerDecommissionIntegrationSuite.scala:423)

Copy link
Contributor

@attilapiros attilapiros Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must be a race condition on JDK21 but I did not managed to reproduce it locally. Neither on arm64 nor on a x86_64 (using docker).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see a jira already exists for this: https://issues.apache.org/jira/browse/SPARK-49297
I am working on a possible fix.


// Remove the shuffle data
sc.shuffleDriverComponents.removeShuffle(shuffleId, true)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
}
}
}