diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index c79d7a08af2..60d7d11cf93 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -22,14 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -51,8 +44,12 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.identity.UserIdentifier; -import org.apache.celeborn.common.meta.*; +import org.apache.celeborn.common.meta.DiskFileInfo; +import org.apache.celeborn.common.meta.FileInfo; +import org.apache.celeborn.common.meta.MemoryFileInfo; +import org.apache.celeborn.common.meta.ReduceFileMeta; import org.apache.celeborn.common.metrics.source.AbstractSource; import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.unsafe.Platform; @@ -61,11 +58,7 @@ import org.apache.celeborn.service.deploy.worker.ShuffleRecoverHelper; import org.apache.celeborn.service.deploy.worker.WorkerSource; import org.apache.celeborn.service.deploy.worker.memory.MemoryManager; -import org.apache.celeborn.service.deploy.worker.shuffledb.DB; -import org.apache.celeborn.service.deploy.worker.shuffledb.DBBackend; -import org.apache.celeborn.service.deploy.worker.shuffledb.DBIterator; -import org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider; -import org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion; +import org.apache.celeborn.service.deploy.worker.shuffledb.*; public class PartitionFilesSorter extends ShuffleRecoverHelper { private static final Logger logger = LoggerFactory.getLogger(PartitionFilesSorter.class); @@ -78,6 +71,8 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { JavaUtils.newConcurrentHashMap(); private final ConcurrentHashMap> sortingShuffleFiles = JavaUtils.newConcurrentHashMap(); + private final ConcurrentHashMap> sortExceptions = + JavaUtils.newConcurrentHashMap(); private final Cache>> indexCache; private final Map> indexCacheNames = JavaUtils.newConcurrentHashMap(); @@ -290,6 +285,11 @@ public FileInfo getSortedFileInfo( } } + if (sortExceptions.containsKey(shuffleKey) + && sortExceptions.get(shuffleKey).containsKey(fileId)) { + throw sortExceptions.get(shuffleKey).get(fileId); + } + return resolve( shuffleKey, fileId, @@ -365,6 +365,7 @@ public static void sortMemoryShuffleFile(MemoryFileInfo memoryFileInfo) { public void cleanup(HashSet expiredShuffleKeys) { for (String expiredShuffleKey : expiredShuffleKeys) { sortingShuffleFiles.remove(expiredShuffleKey); + sortExceptions.remove(expiredShuffleKey); deleteSortedShuffleFiles(expiredShuffleKey); Set expiredIndexCacheItems = indexCacheNames.remove(expiredShuffleKey); if (expiredIndexCacheItems != null) { @@ -727,6 +728,9 @@ public void sort() { singleMapIdShuffleBlockList.add(blockInfo); index += batchHeaderLen + compressedSize; + if (compressedSize < 0) { + throw new CelebornIOException("Shuffle file " + originFilePath + " is corrupted."); + } readBufferBySize(paddingBuf, compressedSize); } @@ -769,6 +773,9 @@ public void sort() { } catch (Exception e) { logger.error( "Sorting shuffle file for " + fileId + " " + originFilePath + " failed, detail: ", e); + sortExceptions + .computeIfAbsent(shuffleKey, v -> JavaUtils.newConcurrentHashMap()) + .put(fileId, new IOException(e)); } finally { closeFiles(); Set sorting = sortingShuffleFiles.get(shuffleKey);