Skip to content

Commit 0b553b8

Browse files
committed
[SPARK-57803][CORE] Delete the temp file when closeAndRead fails on the fetch-to-disk path
### What changes were proposed in this pull request? `OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not. This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds. ### Why are the changes needed? The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 62fd9cb) Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 54c5fe5 commit 0b553b8

2 files changed

Lines changed: 62 additions & 1 deletion

File tree

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,18 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
364364

365365
@Override
366366
public void onComplete(String streamId) throws IOException {
367-
listener.onBlockFetchSuccess(blockIds[chunkIndex], channel.closeAndRead());
367+
ManagedBuffer buffer;
368+
try {
369+
buffer = channel.closeAndRead();
370+
} catch (IOException e) {
371+
// closeAndRead() failed (typically from the channel close) before the buffer was handed
372+
// off, so the temp file was never registered for cleanup. Delete it, and close the
373+
// channel in case it was left open, mirroring onFailure, instead of leaking them.
374+
channel.close();
375+
targetFile.delete();
376+
throw e;
377+
}
378+
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
368379
if (!downloadFileManager.registerTempFileToClean(targetFile)) {
369380
targetFile.delete();
370381
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.shuffle;
1919

20+
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
@@ -34,14 +35,17 @@
3435
import static org.mockito.ArgumentMatchers.eq;
3536
import static org.mockito.Mockito.doAnswer;
3637
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.never;
3739
import static org.mockito.Mockito.times;
3840
import static org.mockito.Mockito.verify;
41+
import static org.mockito.Mockito.when;
3942

4043
import org.apache.spark.network.buffer.ManagedBuffer;
4144
import org.apache.spark.network.buffer.NettyManagedBuffer;
4245
import org.apache.spark.network.buffer.NioManagedBuffer;
4346
import org.apache.spark.network.client.ChunkReceivedCallback;
4447
import org.apache.spark.network.client.RpcResponseCallback;
48+
import org.apache.spark.network.client.StreamCallback;
4549
import org.apache.spark.network.client.TransportClient;
4650
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
4751
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks;
@@ -284,6 +288,52 @@ public void testInvalidShuffleBlockIds() {
284288
new int[][] {{ 0 }}), conf));
285289
}
286290

291+
/**
292+
* Verifies that when fetching to disk, a failure of
293+
* {@link DownloadFileWritableChannel#closeAndRead} does not leak the temp file: the channel is
294+
* closed and the temp file is deleted, and it is never registered for later cleanup.
295+
*/
296+
@Test
297+
public void testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails() throws IOException {
298+
String blockId = "shuffle_0_0_0";
299+
String[] blockIds = new String[] { blockId };
300+
301+
TransportClient client = mock(TransportClient.class);
302+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
303+
304+
DownloadFileWritableChannel channel = mock(DownloadFileWritableChannel.class);
305+
when(channel.closeAndRead()).thenThrow(new IOException("boom"));
306+
DownloadFile downloadFile = mock(DownloadFile.class);
307+
when(downloadFile.openForWriting()).thenReturn(channel);
308+
DownloadFileManager downloadFileManager = mock(DownloadFileManager.class);
309+
when(downloadFileManager.createTempFile(any())).thenReturn(downloadFile);
310+
311+
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
312+
client, "app-id", "exec-id", blockIds, listener, conf, downloadFileManager);
313+
314+
// Respond to the "openBlocks" RPC with a StreamHandle of a single chunk.
315+
doAnswer(invocation -> {
316+
RpcResponseCallback callback = (RpcResponseCallback) invocation.getArguments()[1];
317+
callback.onSuccess(new StreamHandle(123, 1).toByteBuffer());
318+
return null;
319+
}).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class));
320+
321+
// Drive the StreamCallback's onComplete, which triggers the failing closeAndRead().
322+
doAnswer(invocation -> {
323+
StreamCallback callback = (StreamCallback) invocation.getArguments()[1];
324+
assertThrows(IOException.class, () -> callback.onComplete("stream"));
325+
return null;
326+
}).when(client).stream(any(), any(StreamCallback.class));
327+
328+
fetcher.start();
329+
330+
verify(channel).close();
331+
verify(downloadFile).delete();
332+
verify(downloadFileManager, never()).registerTempFileToClean(any());
333+
// closeAndRead() failed, so the buffer must not have been handed off to the listener.
334+
verify(listener, never()).onBlockFetchSuccess(any(), any());
335+
}
336+
287337
/**
288338
* Begins a fetch on the given set of blocks by mocking out the server side of the RPC which
289339
* simply returns the given (BlockId, Block) pairs.

0 commit comments

Comments
 (0)