Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33206][CORE][3.1] Fix shuffle index cache weight calculation for small index files #35720

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,7 +27,7 @@ public class ExecutorDiskUtils {
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
*/
public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
public static String getFilePath(String[] localDirs, int subDirsPerLocalDir, String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
Expand All @@ -38,9 +38,8 @@ public static File getFile(String[] localDirs, int subDirsPerLocalDir, String fi
// Unfortunately, we cannot just call the normalization code that java.io.File
// uses, since it is in the package-private class java.io.FileSystem.
// So we are creating a File just to get the normalized path back to intern it.
// Finally a new File is built and returned with this interned normalized path.
final String normalizedInternedPath = new File(notNormalizedPath).getPath().intern();
return new File(normalizedInternedPath);
// We return this interned normalized path.
return new File(notNormalizedPath).getPath().intern();
}

}
Expand Up @@ -78,7 +78,7 @@ public class ExternalShuffleBlockResolver {
* Caches index file information so that we can avoid open/close the index files
* for each block fetch.
*/
private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
private final LoadingCache<String, ShuffleIndexInformation> shuffleIndexCache;

// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
Expand Down Expand Up @@ -110,17 +110,17 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
CacheLoader<String, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<String, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(String filePath) throws IOException {
return new ShuffleIndexInformation(filePath);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.weigher(new Weigher<File, ShuffleIndexInformation>() {
public int weigh(File file, ShuffleIndexInformation indexInfo) {
return indexInfo.getSize();
.weigher(new Weigher<String, ShuffleIndexInformation>() {
public int weigh(String filePath, ShuffleIndexInformation indexInfo) {
return indexInfo.getRetainedMemorySize();
}
})
.build(indexCacheLoader);
Expand Down Expand Up @@ -302,28 +302,35 @@ private void deleteNonShuffleServiceServedFiles(String[] dirs) {
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, long mapId, int startReduceId, int endReduceId) {
File indexFile = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
String indexFilePath =
ExecutorDiskUtils.getFilePath(
executor.localDirs,
executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFilePath);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(
startReduceId, endReduceId);
return new FileSegmentManagedBuffer(
conf,
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
new File(
ExecutorDiskUtils.getFilePath(
executor.localDirs,
executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data")),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
throw new RuntimeException("Failed to open file: " + indexFilePath, e);
}
}

public ManagedBuffer getDiskPersistedRddBlockData(
ExecutorShuffleInfo executor, int rddId, int splitIndex) {
File file = ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir,
"rdd_" + rddId + "_" + splitIndex);
File file = new File(
ExecutorDiskUtils.getFilePath(
executor.localDirs, executor.subDirsPerLocalDir, "rdd_" + rddId + "_" + splitIndex));
long fileLength = file.length();
ManagedBuffer res = null;
if (file.exists()) {
Expand All @@ -350,8 +357,8 @@ public int removeBlocks(String appId, String execId, String[] blockIds) {
}
int numRemovedBlocks = 0;
for (String blockId : blockIds) {
File file =
ExecutorDiskUtils.getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
File file = new File(
ExecutorDiskUtils.getFilePath(executor.localDirs, executor.subDirsPerLocalDir, blockId));
if (file.delete()) {
numRemovedBlocks++;
} else {
Expand Down
Expand Up @@ -84,7 +84,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
private final ErrorHandler.BlockPushErrorHandler errorHandler;

@SuppressWarnings("UnstableApiUsage")
private final LoadingCache<File, ShuffleIndexInformation> indexCache;
private final LoadingCache<String, ShuffleIndexInformation> indexCache;

@SuppressWarnings("UnstableApiUsage")
public RemoteBlockPushResolver(TransportConf conf) {
Expand All @@ -96,15 +96,16 @@ public RemoteBlockPushResolver(TransportConf conf) {
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
CacheLoader<String, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<String, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(String filePath) throws IOException {
return new ShuffleIndexInformation(filePath);
}
};
indexCache = CacheBuilder.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.weigher((Weigher<String, ShuffleIndexInformation>)
(filePath, indexInfo) -> indexInfo.getRetainedMemorySize())
.build(indexCacheLoader);
this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
}
Expand All @@ -130,7 +131,7 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
// be the first time the merge manager receives a pushed block for a given application
// shuffle partition, or after the merged shuffle file is finalized. We handle these
// two cases accordingly by checking if the file already exists.
File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
File indexFile = new File(getMergedShuffleIndexFilePath(appShuffleId, reduceId));
File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
try {
if (dataFile.exists()) {
Expand Down Expand Up @@ -164,7 +165,7 @@ AppShufflePartitionInfo newAppShufflePartitionInfo(
@Override
public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) {
AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
File indexFile = new File(getMergedShuffleIndexFilePath(appShuffleId, reduceId));
if (!indexFile.exists()) {
throw new RuntimeException(String.format(
"Merged shuffle index file %s not found", indexFile.getPath()));
Expand Down Expand Up @@ -193,47 +194,48 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI
throw new RuntimeException(String.format("Merged shuffle data file %s not found",
dataFile.getPath()));
}
File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
String indexFilePath =
getMergedShuffleIndexFilePath(appShuffleId, reduceId);
try {
// If we get here, the merged shuffle file should have been properly finalized. Thus we can
// use the file length to determine the size of the merged shuffle block.
ShuffleIndexInformation shuffleIndexInformation = indexCache.get(indexFile);
ShuffleIndexInformation shuffleIndexInformation = indexCache.get(indexFilePath);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
return new FileSegmentManagedBuffer(
conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"Failed to open merged shuffle index file %s", indexFile.getPath()), e);
"Failed to open merged shuffle index file %s", indexFilePath), e);
}
}

/**
* The logic here is consistent with
* org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
*/
private File getFile(String appId, String filename) {
private String getFilePath(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
String targetFile = ExecutorDiskUtils.getFilePath(appPathsInfo.activeLocalDirs,
appPathsInfo.subDirsPerLocalDir, filename);
logger.debug("Get merged file {}", targetFile.getAbsolutePath());
logger.debug("Get merged file {}", targetFile);
return targetFile;
}

private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) {
String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId));
return getFile(appShuffleId.appId, fileName);
return new File(getFilePath(appShuffleId.appId, fileName));
}

private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) {
private String getMergedShuffleIndexFilePath(AppShuffleId appShuffleId, int reduceId) {
String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId));
return getFile(appShuffleId.appId, indexName);
return getFilePath(appShuffleId.appId, indexName);
}

private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) {
String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId));
return getFile(appShuffleId.appId, metaName);
return new File(getFilePath(appShuffleId.appId, metaName));
}

@Override
Expand Down
Expand Up @@ -29,25 +29,28 @@
* as an in-memory LongBuffer.
*/
public class ShuffleIndexInformation {

// The estimate of `ShuffleIndexInformation` memory footprint which is relevant in case of small
// index files (i.e. storing only 2 offsets = 16 bytes).
static final int INSTANCE_MEMORY_FOOTPRINT = 176;

/** offsets as long buffer */
private final LongBuffer offsets;
private int size;

public ShuffleIndexInformation(File indexFile) throws IOException {
size = (int)indexFile.length();
ByteBuffer buffer = ByteBuffer.allocate(size);
public ShuffleIndexInformation(String indexFilePath) throws IOException {
File indexFile = new File(indexFilePath);
ByteBuffer buffer = ByteBuffer.allocate((int)indexFile.length());
offsets = buffer.asLongBuffer();
try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
dis.readFully(buffer.array());
}
}

/**
* Size of the index file
* @return size
*/
public int getSize() {
return size;
public int getRetainedMemorySize() {
// SPARK-33206: here the offsets' capacity is multiplied by 8 as offsets stores long values.
// Integer overflow won't be an issue here as long as the number of reducers is under
// (Integer.MAX_VALUE - INSTANCE_MEMORY_FOOTPRINT) / 8 - 1 = 268435432.
return (offsets.capacity() << 3) + INSTANCE_MEMORY_FOOTPRINT;
}

/**
Expand Down
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

import static org.junit.Assert.*;

public class ShuffleIndexInformationSuite {
private static final String sortBlock0 = "tiny block";
private static final String sortBlock1 = "a bit longer block";

private static TestShuffleDataContext dataContext;
private static String blockId;

@BeforeClass
public static void before() throws IOException {
dataContext = new TestShuffleDataContext(2, 5);

dataContext.create();
// Write some sort data.
blockId = dataContext.insertSortShuffleData(0, 0, new byte[][] {
sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)});
}

@AfterClass
public static void afterAll() {
dataContext.cleanup();
}

@Test
public void test() throws IOException {
String path = ExecutorDiskUtils.getFilePath(
dataContext.localDirs,
dataContext.subDirsPerLocalDir,
blockId + ".index");
ShuffleIndexInformation s = new ShuffleIndexInformation(path);
// the index file contains 3 offsets:
// 0, sortBlock0.length, sortBlock0.length + sortBlock1.length
assertEquals(0L, s.getIndex(0).getOffset());
assertEquals(sortBlock0.length(), s.getIndex(0).getLength());

assertEquals(sortBlock0.length(), s.getIndex(1).getOffset());
assertEquals(sortBlock1.length(), s.getIndex(1).getLength());

assertEquals((3 * 8) + ShuffleIndexInformation.INSTANCE_MEMORY_FOOTPRINT,
s.getRetainedMemorySize());
}
}
Expand Up @@ -68,18 +68,19 @@ public void cleanup() {
}

/** Creates reducer blocks in a sort-based data format within our local dirs. */
public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
public String insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks)
throws IOException {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";

OutputStream dataStream = null;
DataOutputStream indexStream = null;
boolean suppressExceptionsDuringClose = true;

try {
dataStream = new FileOutputStream(
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
indexStream = new DataOutputStream(new FileOutputStream(
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
dataStream = new FileOutputStream(new File(
ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir, blockId + ".data")));
indexStream = new DataOutputStream(new FileOutputStream(new File(
ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir, blockId + ".index"))));

long offset = 0;
indexStream.writeLong(offset);
Expand All @@ -93,6 +94,7 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr
Closeables.close(dataStream, suppressExceptionsDuringClose);
Closeables.close(indexStream, suppressExceptionsDuringClose);
}
return blockId;
}

/** Creates spill file(s) within the local dirs. */
Expand Down Expand Up @@ -122,11 +124,11 @@ private void insertFile(String filename) throws IOException {

private void insertFile(String filename, byte[] block) throws IOException {
OutputStream dataStream = null;
File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename);
File file = new File(ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir, filename));
Assert.assertFalse("this test file has been already generated", file.exists());
try {
dataStream = new FileOutputStream(
ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
new File(ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir, filename)));
dataStream.write(block);
} finally {
Closeables.close(dataStream, false);
Expand Down
Expand Up @@ -81,7 +81,8 @@ private[spark] class IndexShuffleBlockResolver(
def getDataFile(shuffleId: Int, mapId: Long, dirs: Option[Array[String]]): File = {
val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
dirs
.map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name))
.map(d =>
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name)))
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
}

Expand All @@ -97,7 +98,8 @@ private[spark] class IndexShuffleBlockResolver(
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
dirs
.map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name))
.map(d =>
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, blockId.name)))
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
}

Expand Down