Skip to content

Commit

Permalink
Configure readahead if the filesystem supports it.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <psp@zurich.ibm.com>
  • Loading branch information
pspoerri committed Aug 31, 2023
1 parent 1c05385 commit 2ec4122
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ class S3ShuffleDispatcher extends Logging {
SparkHadoopUtil.newConfiguration(conf)
})

val canSetReadahead = fs.hasPathCapability(new Path(rootDir), StreamCapabilities.READAHEAD)

// Required
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir})")
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir} - can set readahead: ${canSetReadahead})")

// Optional
logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}")
Expand Down Expand Up @@ -117,8 +119,13 @@ class S3ShuffleDispatcher extends Logging {
* @return
*/
def openBlock(blockId: BlockId): FSDataInputStream = {
val builder = fs.openFile(getPath(blockId)).withFileStatus(getFileStatusCached(blockId))
builder.build().get()
val status = getFileStatusCached(blockId)
val builder = fs.openFile(status.getPath).withFileStatus(status)
val stream = builder.build().get()
if (canSetReadahead) {
stream.setReadahead(0)
}
stream
}

private val cachedFileStatus = new ConcurrentObjectMap[BlockId, FileStatus]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]
}
catch {
case _: InterruptedException =>
Thread.currentThread.interrupt()
}
} else {
fetchNext = true
Expand All @@ -63,7 +62,7 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]
memoryUsage += bsize
completed.push((stream, block, bsize))
hasItem = iter.hasNext
notifyAll()
notify()
}
}
}
Expand Down Expand Up @@ -117,7 +116,6 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]
wait()
} catch {
case _: InterruptedException =>
Thread.currentThread.interrupt()
}
}
timeWaiting += System.nanoTime() - now
Expand Down

0 comments on commit 2ec4122

Please sign in to comment.