From 8ea1367291261603d3a5b616c7c5018b42b6cfbc Mon Sep 17 00:00:00 2001 From: Chandni Singh Date: Wed, 23 Dec 2020 12:42:18 -0600 Subject: [PATCH] [SPARK-32916][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Ensure the number of chunks in meta file and index file are equal ### What changes were proposed in this pull request? 1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in https://github.com/apache/spark/pull/30062. - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file. - During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it. 2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold while updating data/meta/index file of a shuffle partition, then it responds to the client with exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition. 3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size. ### Why are the changes needed? This fix is needed for the bugs mentioned above. 1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa. 2. Truncating the lengths of data/index/meta files when the partition is finalized. 3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition. 4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests for all the bugs and threshold. Closes #30433 from otterc/SPARK-32916-followup. Authored-by: Chandni Singh Signed-off-by: Mridul Muralidharan gmail.com> --- .../spark/network/util/TransportConf.java | 10 + .../spark/network/shuffle/ErrorHandler.java | 9 + .../shuffle/RemoteBlockPushResolver.java | 301 ++++++++++---- .../shuffle/RemoteBlockPushResolverSuite.java | 380 ++++++++++++++++++ 4 files changed, 629 insertions(+), 71 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 57b1f10bcfd79..1794918ee6129 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -534,4 +534,14 @@ public long mergedIndexCacheSize() { return JavaUtils.byteStringAsBytes( conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m")); } + + /** + * The threshold for number of IOExceptions while merging shuffle blocks to a shuffle partition. + * When the number of IOExceptions while writing to merged shuffle data/index/meta file exceed + * this threshold then the shuffle server will respond back to client to stop pushing shuffle + * blocks for this shuffle partition. + */ + public int ioExceptionsThresholdDuringMerge() { + return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java index d13a0272744a0..968777fba785d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java @@ -71,6 +71,15 @@ class BlockPushErrorHandler implements ErrorHandler { public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX = "Couldn't find an opportunity to write block"; + /** + * String constant used for generating exception messages indicating the server encountered + * IOExceptions multiple times, greater than the configured threshold, while trying to merged + * shuffle blocks of the same shuffle partition. When the client receives this this response, + * it will stop pushing any more blocks for the same shuffle partition. + */ + public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX = + "IOExceptions exceeded the threshold"; + @Override public boolean shouldRetryError(Throwable t) { // If it is a connection time out or a connection closed exception, no need to retry. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 29fd6f1fc8dcf..06cb019b25419 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -17,15 +17,16 @@ package org.apache.spark.network.shuffle; +import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -45,6 +46,8 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private final Executor directoryCleaner; private final TransportConf conf; private final int minChunkSize; + private final int ioExceptionsThresholdDuringMerge; private final ErrorHandler.BlockPushErrorHandler errorHandler; @SuppressWarnings("UnstableApiUsage") @@ -92,6 +96,7 @@ public RemoteBlockPushResolver(TransportConf conf) { // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); + this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -132,7 +137,7 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( if (dataFile.exists()) { return null; } else { - return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile); + return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile); } } catch (IOException e) { logger.error( @@ -146,6 +151,17 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( }); } + @VisibleForTesting + AppShufflePartitionInfo newAppShufflePartitionInfo( + AppShuffleId appShuffleId, + int reduceId, + File dataFile, + File indexFile, + File metaFile) throws IOException { + return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, + new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile)); + } + @Override public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) { AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId); @@ -370,26 +386,19 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]); } else { Collection partitionsToFinalize = shufflePartitions.values(); - int totalPartitions = partitionsToFinalize.size(); - RoaringBitmap[] bitmaps = new RoaringBitmap[totalPartitions]; - int[] reduceIds = new int[totalPartitions]; - long[] sizes = new long[totalPartitions]; + List bitmaps = new ArrayList<>(partitionsToFinalize.size()); + List reduceIds = new ArrayList<>(partitionsToFinalize.size()); + List sizes = new ArrayList<>(partitionsToFinalize.size()); Iterator partitionsIter = partitionsToFinalize.iterator(); - int idx = 0; while (partitionsIter.hasNext()) { AppShufflePartitionInfo partition = partitionsIter.next(); synchronized (partition) { - // Get rid of any partial block data at the end of the file. This could either - // be due to failure or a request still being processed when the shuffle - // merge gets finalized. try { - partition.dataChannel.truncate(partition.getPosition()); - if (partition.getPosition() != partition.getLastChunkOffset()) { - partition.updateChunkInfo(partition.getPosition(), partition.lastMergedMapIndex); - } - bitmaps[idx] = partition.mapTracker; - reduceIds[idx] = partition.reduceId; - sizes[idx++] = partition.getPosition(); + // This can throw IOException which will marks this shuffle partition as not merged. + partition.finalizePartition(); + bitmaps.add(partition.mapTracker); + reduceIds.add(partition.reduceId); + sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, msg.shuffleId, partition.reduceId, ioe); @@ -401,7 +410,9 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc } } } - mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps, reduceIds, sizes); + mergeStatuses = new MergeStatuses(msg.shuffleId, + bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), + Longs.toArray(sizes)); } partitions.remove(appShuffleId); logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); @@ -450,6 +461,7 @@ private PushBlockStreamCallback( this.streamId = streamId; this.partitionInfo = Preconditions.checkNotNull(partitionInfo); this.mapIndex = mapIndex; + abortIfNecessary(); } @Override @@ -466,11 +478,11 @@ public String getID() { private void writeBuf(ByteBuffer buf) throws IOException { while (buf.hasRemaining()) { if (partitionInfo.isEncounteredFailure()) { - long updatedPos = partitionInfo.getPosition() + length; + long updatedPos = partitionInfo.getDataFilePos() + length; logger.debug( "{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, - partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos); + partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos); } else { length += partitionInfo.dataChannel.write(buf); @@ -510,15 +522,35 @@ private boolean isDuplicateBlock() { * This is only invoked when the stream is able to write. The stream first writes any deferred * block parts buffered in memory. */ - private void writeAnyDeferredBufs() throws IOException { - if (deferredBufs != null && !deferredBufs.isEmpty()) { - for (ByteBuffer deferredBuf : deferredBufs) { - writeBuf(deferredBuf); - } + private void writeDeferredBufs() throws IOException { + for (ByteBuffer deferredBuf : deferredBufs) { + writeBuf(deferredBuf); + } + deferredBufs = null; + } + + /** + * This throws RuntimeException if the number of IOExceptions have exceeded threshold. + */ + private void abortIfNecessary() { + if (partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) { deferredBufs = null; + throw new RuntimeException(String.format("%s when merging %s", + ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX, + streamId)); } } + /** + * This increments the number of IOExceptions and throws RuntimeException if it exceeds the + * threshold which will abort the merge of a particular shuffle partition. + */ + private void incrementIOExceptionsAndAbortIfNecessary() { + // Update the count of IOExceptions + partitionInfo.incrementIOExceptions(); + abortIfNecessary(); + } + @Override public void onData(String streamId, ByteBuffer buf) throws IOException { // When handling the block data using StreamInterceptor, it can help to reduce the amount @@ -556,6 +588,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { deferredBufs = null; return; } + abortIfNecessary(); logger.trace("{} shuffleId {} reduceId {} onData writable", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, partitionInfo.reduceId); @@ -565,8 +598,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { // If we got here, it's safe to write the block data to the merged shuffle file. We // first write any deferred block. - writeAnyDeferredBufs(); - writeBuf(buf); + try { + if (deferredBufs != null && !deferredBufs.isEmpty()) { + writeDeferredBufs(); + } + writeBuf(buf); + } catch (IOException ioe) { + incrementIOExceptionsAndAbortIfNecessary(); + // If the above doesn't throw a RuntimeException, then we propagate the IOException + // back to the client so the block could be retried. + throw ioe; + } // If we got here, it means we successfully write the current chunk of block to merged // shuffle file. If we encountered failure while writing the previous block, we should // reset the file channel position and the status of partitionInfo to indicate that we @@ -574,7 +616,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { // position tracked by partitionInfo here. That is only updated while the entire block // is successfully written to merged shuffle file. if (partitionInfo.isEncounteredFailure()) { - partitionInfo.dataChannel.position(partitionInfo.getPosition() + length); + partitionInfo.dataChannel.position(partitionInfo.getDataFilePos() + length); partitionInfo.setEncounteredFailure(false); } } else { @@ -636,15 +678,33 @@ public void onComplete(String streamId) throws IOException { return; } if (partitionInfo.getCurrentMapIndex() < 0) { - writeAnyDeferredBufs(); + try { + if (deferredBufs != null && !deferredBufs.isEmpty()) { + abortIfNecessary(); + writeDeferredBufs(); + } + } catch (IOException ioe) { + incrementIOExceptionsAndAbortIfNecessary(); + // If the above doesn't throw a RuntimeException, then we propagate the IOException + // back to the client so the block could be retried. + throw ioe; + } } - long updatedPos = partitionInfo.getPosition() + length; + long updatedPos = partitionInfo.getDataFilePos() + length; boolean indexUpdated = false; if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) { - partitionInfo.updateChunkInfo(updatedPos, mapIndex); - indexUpdated = true; + try { + partitionInfo.updateChunkInfo(updatedPos, mapIndex); + indexUpdated = true; + } catch (IOException ioe) { + incrementIOExceptionsAndAbortIfNecessary(); + // If the above doesn't throw a RuntimeException, then we do not propagate the + // IOException to the client. This may increase the chunk size however the increase is + // still limited because of the limit on the number of IOExceptions for a + // particular shuffle partition. + } } - partitionInfo.setPosition(updatedPos); + partitionInfo.setDataFilePos(updatedPos); partitionInfo.setCurrentMapIndex(-1); // update merged results @@ -687,6 +747,11 @@ public void onFailure(String streamId, Throwable throwable) throws IOException { } } } + + @VisibleForTesting + AppShufflePartitionInfo getPartitionInfo() { + return partitionInfo; + } } /** @@ -736,7 +801,7 @@ public static class AppShufflePartitionInfo { // The merged shuffle data file channel public FileChannel dataChannel; // Location offset of the last successfully merged block for this shuffle partition - private long position; + private long dataFilePos; // Indicating whether failure was encountered when merging the previous block private boolean encounteredFailure; // Track the map index whose block is being merged for this shuffle partition @@ -744,44 +809,46 @@ public static class AppShufflePartitionInfo { // Bitmap tracking which mapper's blocks have been merged for this shuffle partition private RoaringBitmap mapTracker; // The index file for a particular merged shuffle contains the chunk offsets. - private RandomAccessFile indexFile; + private MergeShuffleFile indexFile; // The meta file for a particular merged shuffle contains all the map indices that belong to // every chunk. The entry per chunk is a serialized bitmap. - private RandomAccessFile metaFile; + private MergeShuffleFile metaFile; // The offset for the last chunk tracked in the index file for this shuffle partition private long lastChunkOffset; private int lastMergedMapIndex = -1; // Bitmap tracking which mapper's blocks are in the current shuffle chunk private RoaringBitmap chunkTracker; + private int numIOExceptions = 0; + private boolean indexMetaUpdateFailed; AppShufflePartitionInfo( AppShuffleId appShuffleId, int reduceId, File dataFile, - File indexFile, - File metaFile) throws IOException { + MergeShuffleFile indexFile, + MergeShuffleFile metaFile) throws IOException { this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id"); this.reduceId = reduceId; this.dataChannel = new FileOutputStream(dataFile).getChannel(); - this.indexFile = new RandomAccessFile(indexFile, "rw"); - this.metaFile = new RandomAccessFile(metaFile, "rw"); + this.indexFile = indexFile; + this.metaFile = metaFile; this.currentMapIndex = -1; // Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex() updateChunkInfo(0L, -1); - this.position = 0; + this.dataFilePos = 0; this.encounteredFailure = false; this.mapTracker = new RoaringBitmap(); this.chunkTracker = new RoaringBitmap(); } - public long getPosition() { - return position; + public long getDataFilePos() { + return dataFilePos; } - public void setPosition(long position) { + public void setDataFilePos(long dataFilePos) { logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId, - appShuffleId.shuffleId, reduceId, this.position, position); - this.position = position; + appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos); + this.dataFilePos = dataFilePos; } boolean isEncounteredFailure() { @@ -825,25 +892,29 @@ void resetChunkTracker() { * @param mapIndex the map index to be added to chunk tracker. */ void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { - long idxStartPos = -1; try { - // update the chunk tracker to meta file before index file + logger.trace("{} shuffleId {} reduceId {} index current {} updated {}", + appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset); + if (indexMetaUpdateFailed) { + indexFile.getChannel().position(indexFile.getPos()); + } + indexFile.getDos().writeLong(chunkOffset); + // Chunk bitmap should be written to the meta file after the index file because if there are + // any exceptions during writing the offset to the index file, meta file should not be + // updated. If the update to the index file is successful but the update to meta file isn't + // then the index file position is not updated. writeChunkTracker(mapIndex); - idxStartPos = indexFile.getFilePointer(); - logger.trace("{} shuffleId {} reduceId {} updated index current {} updated {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, - chunkOffset); - indexFile.writeLong(chunkOffset); + indexFile.updatePos(8); + this.lastChunkOffset = chunkOffset; + indexMetaUpdateFailed = false; } catch (IOException ioe) { - if (idxStartPos != -1) { - // reset the position to avoid corrupting index files during exception. - logger.warn("{} shuffleId {} reduceId {} reset index to position {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos); - indexFile.seek(idxStartPos); - } + logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appShuffleId.appId, + appShuffleId.shuffleId, reduceId); + indexMetaUpdateFailed = true; + // Any exception here is propagated to the caller and the caller can decide whether to + // abort or not. throw ioe; } - this.lastChunkOffset = chunkOffset; } private void writeChunkTracker(int mapIndex) throws IOException { @@ -851,17 +922,38 @@ private void writeChunkTracker(int mapIndex) throws IOException { return; } chunkTracker.add(mapIndex); - long metaStartPos = metaFile.getFilePointer(); - try { - logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex); - chunkTracker.serialize(metaFile); - } catch (IOException ioe) { - logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of meta file to {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex, metaStartPos); - metaFile.seek(metaStartPos); - throw ioe; + logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", + appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex); + if (indexMetaUpdateFailed) { + metaFile.getChannel().position(metaFile.getPos()); + } + chunkTracker.serialize(metaFile.getDos()); + metaFile.updatePos(metaFile.getChannel().position() - metaFile.getPos()); + } + + private void incrementIOExceptions() { + numIOExceptions++; + } + + private boolean shouldAbort(int ioExceptionsThresholdDuringMerge) { + return numIOExceptions > ioExceptionsThresholdDuringMerge; + } + + private void finalizePartition() throws IOException { + if (dataFilePos != lastChunkOffset) { + try { + updateChunkInfo(dataFilePos, lastMergedMapIndex); + } catch (IOException ioe) { + // Any exceptions here while updating the meta files can be ignored. If the files + // aren't successfully updated they will be truncated. + } } + // Get rid of any partial block data at the end of the file. This could either + // be due to failure, or a request still being processed when the shuffle + // merge gets finalized, or any exceptions while updating index/meta files. + dataChannel.truncate(lastChunkOffset); + indexFile.getChannel().truncate(indexFile.getPos()); + metaFile.getChannel().truncate(metaFile.getPos()); } void closeAllFiles() { @@ -877,7 +969,6 @@ void closeAllFiles() { } if (metaFile != null) { try { - // if the stream is closed, channel get's closed as well. metaFile.close(); } catch (IOException ioe) { logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", @@ -902,6 +993,26 @@ void closeAllFiles() { protected void finalize() throws Throwable { closeAllFiles(); } + + @VisibleForTesting + MergeShuffleFile getIndexFile() { + return indexFile; + } + + @VisibleForTesting + MergeShuffleFile getMetaFile() { + return metaFile; + } + + @VisibleForTesting + FileChannel getDataChannel() { + return dataChannel; + } + + @VisibleForTesting + int getNumIOExceptions() { + return numIOExceptions; + } } /** @@ -931,4 +1042,52 @@ private AppPathsInfo( } } } + + @VisibleForTesting + static class MergeShuffleFile { + private FileChannel channel; + private DataOutputStream dos; + private long pos; + + @VisibleForTesting + MergeShuffleFile(File file) throws IOException { + FileOutputStream fos = new FileOutputStream(file); + channel = fos.getChannel(); + dos = new DataOutputStream(fos); + } + + @VisibleForTesting + MergeShuffleFile(FileChannel channel, DataOutputStream dos) { + this.channel = channel; + this.dos = dos; + } + + private void updatePos(long numBytes) { + pos += numBytes; + } + + void close() throws IOException { + try { + dos.close(); + } finally { + dos = null; + channel = null; + } + } + + @VisibleForTesting + DataOutputStream getDos() { + return dos; + } + + @VisibleForTesting + FileChannel getChannel() { + return channel; + } + + @VisibleForTesting + long getPos() { + return pos; + } + } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 0f200dc721963..8c6f7434748ec 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -17,9 +17,12 @@ package org.apache.spark.network.shuffle; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -42,6 +45,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.client.StreamCallbackWithID; +import org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; @@ -411,6 +415,347 @@ void deleteExecutorDirs(Path[] dirs) { } } + @Test + public void testRecoverIndexFileAfterIOExceptions() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the index stream so it throws IOException + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index file will be unsuccessful. + callback2.onComplete(callback2.getID()); + assertEquals("index position", 16, testIndexFile.getPos()); + // Restore the index stream so it can write successfully again. + testIndexFile.restore(); + StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); + callback3.onComplete(callback3.getID()); + assertEquals("index position", 24, testIndexFile.getPos()); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); + } + + @Test + public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the index stream so it throws IOException + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index file will be unsuccessful. + callback2.onComplete(callback2.getID()); + assertEquals("index position", 16, testIndexFile.getPos()); + // The last update to index was unsuccessful however any further updates will be successful. + // Restore the index stream so it can write successfully again. + testIndexFile.restore(); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + assertEquals("index position", 24, testIndexFile.getPos()); + validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}}); + } + + @Test + public void testRecoverMetaFileAfterIOExceptions() throws IOException { + useTestFiles(false, true); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the meta stream so it throws IOException + TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile) partitionInfo.getMetaFile(); + long metaPosBeforeClose = testMetaFile.getPos(); + testMetaFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index and meta file will be unsuccessful. + callback2.onComplete(callback2.getID()); + assertEquals("index position", 16, partitionInfo.getIndexFile().getPos()); + assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos()); + // Restore the meta stream so it can write successfully again. + testMetaFile.restore(); + StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); + callback3.onComplete(callback3.getID()); + assertEquals("index position", 24, partitionInfo.getIndexFile().getPos()); + assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); + } + + @Test + public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException { + useTestFiles(false, true); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the meta stream so it throws IOException + TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile) partitionInfo.getMetaFile(); + long metaPosBeforeClose = testMetaFile.getPos(); + testMetaFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index and meta file will be unsuccessful. + callback2.onComplete(callback2.getID()); + MergeShuffleFile indexFile = partitionInfo.getIndexFile(); + assertEquals("index position", 16, indexFile.getPos()); + assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos()); + // Restore the meta stream so it can write successfully again. + testMetaFile.restore(); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + assertEquals("index position", 24, indexFile.getPos()); + assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); + validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}}); + } + + @Test (expected = RuntimeException.class) + public void testIOExceptionsExceededThreshold() throws IOException { + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + callback.onComplete(callback.getID()); + // Close the data stream so it throws continuous IOException + partitionInfo.getDataChannel().close(); + for (int i = 1; i < 5; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + try { + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2])); + } catch (IOException ioe) { + // this will throw IOException so the client can retry. + callback1.onFailure(callback1.getID(), ioe); + } + } + assertEquals(4, partitionInfo.getNumIOExceptions()); + // After 4 IOException, the server will respond with IOExceptions exceeded threshold + try { + RemoteBlockPushResolver.PushBlockStreamCallback callback2 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1])); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + callback.onComplete(callback.getID()); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + for (int i = 1; i < 5; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any exceptions but the exception count is increased. + callback1.onComplete(callback1.getID()); + } + assertEquals(4, partitionInfo.getNumIOExceptions()); + // After 4 IOException, the server will respond with IOExceptions exceeded threshold for any + // new request for this partition. + try { + RemoteBlockPushResolver.PushBlockStreamCallback callback2 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4])); + callback2.onComplete(callback2.getID()); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testRequestForAbortedShufflePartitionThrowsException() { + try { + testIOExceptionsDuringMetaUpdateIncreasesExceptionCount(); + } catch (Throwable t) { + // No more blocks can be merged to this partition. + } + try { + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 10, 0, 0)); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testPendingBlockIsAbortedImmediately() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + for (int i = 1; i < 6; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + try { + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any exceptions but the exception count is increased. + callback1.onComplete(callback1.getID()); + } catch (Throwable t) { + callback1.onFailure(callback1.getID(), t); + } + } + assertEquals(5, partitionInfo.getNumIOExceptions()); + // The server will respond with IOExceptions exceeded threshold for any additional attempts + // to write. + try { + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + for (int i = 1; i < 5; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + try { + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any exceptions but the exception count is increased. + callback1.onComplete(callback1.getID()); + } catch (Throwable t) { + callback1.onFailure(callback1.getID(), t); + } + } + assertEquals(4, partitionInfo.getNumIOExceptions()); + RemoteBlockPushResolver.PushBlockStreamCallback callback2 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This is deferred + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + // Callback2 completes which will throw another exception. + try { + callback2.onComplete(callback2.getID()); + } catch (Throwable t) { + callback2.onFailure(callback2.getID(), t); + } + assertEquals(5, partitionInfo.getNumIOExceptions()); + // Restore index file so that any further writes to it are successful and any exceptions are + // due to IOExceptions exceeding threshold. + testIndexFile.restore(); + try { + callback.onComplete(callback.getID()); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0", + t.getMessage()); + throw t; + } + } + + @Test + public void testFailureWhileTruncatingFiles() throws IOException { + useTestFiles(true, false); + PushBlock[] pushBlocks = new PushBlock[] { + new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[2])), + new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[3])), + new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])), + new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3])) + }; + pushBlockHelper(TEST_APP, pushBlocks); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2])); + callback.onComplete(callback.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + // Close the index file so truncate throws IOException + testIndexFile.close(); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + validateMergeStatuses(statuses, new int[] {1}, new long[] {8}); + MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1); + validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}}); + } + + private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId, + File dataFile, File indexFile, File metaFile) throws IOException { + MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile) + : new MergeShuffleFile(indexFile); + MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) : + new MergeShuffleFile(metaFile); + return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, mergedIndexFile, + mergedMetaFile); + } + }; + registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + } + private Path[] createLocalDirs(int numLocalDirs) throws IOException { Path[] localDirs = new Path[numLocalDirs]; for (int i = 0; i < localDirs.length; i++) { @@ -493,4 +838,39 @@ private static class PushBlock { this.buffer = buffer; } } + + private static class TestMergeShuffleFile extends MergeShuffleFile { + private DataOutputStream activeDos; + private File file; + private FileChannel channel; + + private TestMergeShuffleFile(File file) throws IOException { + super(null, null); + this.file = file; + FileOutputStream fos = new FileOutputStream(file); + channel = fos.getChannel(); + activeDos = new DataOutputStream(fos); + } + + @Override + DataOutputStream getDos() { + return activeDos; + } + + @Override + FileChannel getChannel() { + return channel; + } + + @Override + void close() throws IOException { + activeDos.close(); + } + + void restore() throws IOException { + FileOutputStream fos = new FileOutputStream(file, true); + channel = fos.getChannel(); + activeDos = new DataOutputStream(fos); + } + } }