Skip to content

Commit

Permalink
Disable caching for listShuffleIndices.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <psp@zurich.ibm.com>
  • Loading branch information
pspoerri committed Jul 27, 2023
1 parent dba15d9 commit cb953db
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,24 @@ import java.io.{BufferedInputStream, BufferedOutputStream, IOException}
import java.nio.ByteBuffer
import java.util
import java.util.zip.{Adler32, CRC32, Checksum}
import java.util.regex.Pattern
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.{Await, Future}

object S3ShuffleHelper extends Logging {
private lazy val serializer = SparkEnv.get.serializer
private lazy val dispatcher = S3ShuffleDispatcher.get

private val cachedChecksums = new ConcurrentObjectMap[ShuffleChecksumBlockId, Array[Long]]()
private val cachedArrayLengths = new ConcurrentObjectMap[ShuffleIndexBlockId, Array[Long]]()
private val cachedIndexBlocks = new ConcurrentObjectMap[Int, Array[ShuffleIndexBlockId]]()

/**
* Purge cached shuffle indices.
*
* @param shuffleIndex
*/
def purgeCachedShuffleIndices(shuffleIndex: Int): Unit = {
val indexFilter = (idx: Int) => idx == shuffleIndex
val blockFilter = (block: ShuffleIndexBlockId) => block.shuffleId == shuffleIndex
cachedIndexBlocks.remove(indexFilter, None)
cachedArrayLengths.remove(blockFilter, None)
}

Expand Down Expand Up @@ -60,22 +56,12 @@ object S3ShuffleHelper extends Logging {
file.close()
}

/**
* List cached shuffle indices.
*
* @param shuffleId
* @return
*/
def listShuffleIndicesCached(shuffleId: Int): Array[ShuffleIndexBlockId] = {
cachedIndexBlocks.getOrElsePut(shuffleId, listShuffleIndices)
}

private def listShuffleIndices(shuffleId: Int): Array[ShuffleIndexBlockId] = {
def listShuffleIndices(shuffleId: Int): Array[ShuffleIndexBlockId] = {
val shuffleIndexFilter: PathFilter = new PathFilter() {
private val regex = Pattern.compile(f"shuffle_${shuffleId}" + "_([0-9]+)_([0-9]+).index")

private val prefix = f"shuffle_${shuffleId}_"
override def accept(path: Path): Boolean = {
regex.matcher(path.getName).matches()
val name = path.getName
name.startsWith(prefix) && name.endsWith("_0.index")
}
}
Range(0, dispatcher.folderPrefixes).map(idx => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class S3ShuffleReader[K, C](
.flatMap(info => ShuffleBlockFetcherIterator.mergeContinuousShuffleBlockIdsIfNeeded(info, doBatchFetch))
.map(_.blockId)
} else {
val indices = S3ShuffleHelper.listShuffleIndicesCached(shuffleId).filter(
val indices = S3ShuffleHelper.listShuffleIndices(shuffleId).filter(
block => block.mapId >= startMapIndex && block.mapId < endMapIndex)
if (doBatchFetch || dispatcher.forceBatchFetch) {
indices.map(block => ShuffleBlockBatchId(block.shuffleId, block.mapId, startPartition, endPartition)).toIterator
Expand Down

0 comments on commit cb953db

Please sign in to comment.