Skip to content

Commit 62fd9cb

Browse files
committed
[SPARK-57803][CORE] Delete the temp file when closeAndRead fails on the fetch-to-disk path
### What changes were proposed in this pull request? `OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not. This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds. ### Why are the changes needed? The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 6313ea6 commit 62fd9cb

2 files changed

Lines changed: 62 additions & 1 deletion

File tree

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,18 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
364364

365365
@Override
366366
public void onComplete(String streamId) throws IOException {
367-
listener.onBlockFetchSuccess(blockIds[chunkIndex], channel.closeAndRead());
367+
ManagedBuffer buffer;
368+
try {
369+
buffer = channel.closeAndRead();
370+
} catch (IOException e) {
371+
// closeAndRead() failed (typically from the channel close) before the buffer was handed
372+
// off, so the temp file was never registered for cleanup. Delete it, and close the
373+
// channel in case it was left open, mirroring onFailure, instead of leaking them.
374+
channel.close();
375+
targetFile.delete();
376+
throw e;
377+
}
378+
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
368379
if (!downloadFileManager.registerTempFileToClean(targetFile)) {
369380
targetFile.delete();
370381
}

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.shuffle;
1919

20+
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
@@ -33,14 +34,17 @@
3334
import static org.mockito.ArgumentMatchers.eq;
3435
import static org.mockito.Mockito.doAnswer;
3536
import static org.mockito.Mockito.mock;
37+
import static org.mockito.Mockito.never;
3638
import static org.mockito.Mockito.times;
3739
import static org.mockito.Mockito.verify;
40+
import static org.mockito.Mockito.when;
3841

3942
import org.apache.spark.network.buffer.ManagedBuffer;
4043
import org.apache.spark.network.buffer.NettyManagedBuffer;
4144
import org.apache.spark.network.buffer.NioManagedBuffer;
4245
import org.apache.spark.network.client.ChunkReceivedCallback;
4346
import org.apache.spark.network.client.RpcResponseCallback;
47+
import org.apache.spark.network.client.StreamCallback;
4448
import org.apache.spark.network.client.TransportClient;
4549
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
4650
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks;
@@ -283,6 +287,52 @@ public void testInvalidShuffleBlockIds() {
283287
new int[][] {{ 0 }}), conf));
284288
}
285289

290+
/**
291+
* Verifies that when fetching to disk, a failure of
292+
* {@link DownloadFileWritableChannel#closeAndRead} does not leak the temp file: the channel is
293+
* closed and the temp file is deleted, and it is never registered for later cleanup.
294+
*/
295+
@Test
296+
public void testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails() throws IOException {
297+
String blockId = "shuffle_0_0_0";
298+
String[] blockIds = new String[] { blockId };
299+
300+
TransportClient client = mock(TransportClient.class);
301+
BlockFetchingListener listener = mock(BlockFetchingListener.class);
302+
303+
DownloadFileWritableChannel channel = mock(DownloadFileWritableChannel.class);
304+
when(channel.closeAndRead()).thenThrow(new IOException("boom"));
305+
DownloadFile downloadFile = mock(DownloadFile.class);
306+
when(downloadFile.openForWriting()).thenReturn(channel);
307+
DownloadFileManager downloadFileManager = mock(DownloadFileManager.class);
308+
when(downloadFileManager.createTempFile(any())).thenReturn(downloadFile);
309+
310+
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
311+
client, "app-id", "exec-id", blockIds, listener, conf, downloadFileManager);
312+
313+
// Respond to the "openBlocks" RPC with a StreamHandle of a single chunk.
314+
doAnswer(invocation -> {
315+
RpcResponseCallback callback = (RpcResponseCallback) invocation.getArguments()[1];
316+
callback.onSuccess(new StreamHandle(123, 1).toByteBuffer());
317+
return null;
318+
}).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class));
319+
320+
// Drive the StreamCallback's onComplete, which triggers the failing closeAndRead().
321+
doAnswer(invocation -> {
322+
StreamCallback callback = (StreamCallback) invocation.getArguments()[1];
323+
assertThrows(IOException.class, () -> callback.onComplete("stream"));
324+
return null;
325+
}).when(client).stream(any(), any(StreamCallback.class));
326+
327+
fetcher.start();
328+
329+
verify(channel).close();
330+
verify(downloadFile).delete();
331+
verify(downloadFileManager, never()).registerTempFileToClean(any());
332+
// closeAndRead() failed, so the buffer must not have been handed off to the listener.
333+
verify(listener, never()).onBlockFetchSuccess(any(), any());
334+
}
335+
286336
/**
287337
* Begins a fetch on the given set of blocks by mocking out the server side of the RPC which
288338
* simply returns the given (BlockId, Block) pairs.

0 commit comments

Comments
 (0)