[CELEBORN-980] Asynchronously delete original files to fix ReusedExchange bug#1932
[CELEBORN-980] Asynchronously delete original files to fix ReusedExchange bug#1932cfmcgrady wants to merge 39 commits intoapache:mainfrom
ReusedExchange bug#1932Conversation
Codecov Report
@@ Coverage Diff @@
## main #1932 +/- ##
==========================================
+ Coverage 46.57% 46.98% +0.41%
==========================================
Files 164 164
Lines 10293 10361 +68
Branches 938 956 +18
==========================================
+ Hits 4793 4867 +74
+ Misses 5185 5177 -8
- Partials 315 317 +2
... and 13 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
|
As suggested #1907 (review), implemented a new approach that moves the state (stream ids) responsible for deleting unsorted files into the |
|
the |
| while (it.hasNext()) { | ||
| PartitionFilesSorter.FileSorter sorter = it.next(); | ||
| try { | ||
| if (sorter.getOriginFileInfo().isStreamsEmpty()) { |
There was a problem hiding this comment.
If there is no non skew streams, the origin file can be deleted. I think it's good to record all stream in fileinfo, I'll need it in supporting memory storage.
There was a problem hiding this comment.
If there is no non skew streams, the origin file can be deleted.
it's not, only when the range open stream request(skew stream) will trigger FileSorter.sort(), and then the FileSorter instance will be added to the cleanup queue.
I think it's good to record all stream in fileinfo, I'll need it in supporting memory storage.
yeah, I also read the proposal you posted on the mailing list, in the current implementation, record all activity streams in the fileinfo already done.
There was a problem hiding this comment.
the relations between stream and fileinfo:
for non-skew streams(non-range open stream request), it's n to 1:
non-range stream id1
non-range stream id2 ---> original unsorted FileInfo A
non-range stream id3
for skew streams(range open stream request), it's 1 to 1, this is because we don't manage the sorted fileInfos within StorageManager:
range stream id4 ---> sorted FileInfo B
range stream id5 ---> sorted FileInfo C
range stream id6 ---> sorted FileInfo D
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
Show resolved
Hide resolved
| } | ||
|
|
||
| public void setSorted() { | ||
| synchronized (sorted) { |
There was a problem hiding this comment.
This syncrhonize block is redundant.
There was a problem hiding this comment.
it's for blocking the addStream
There was a problem hiding this comment.
I agree with @FMX , the synchronization in addStream and isStreamsEmpty are necessary because these two methods need to be sequential, but it's unnecessary to synchronize in setSorted and closeStream.
| } | ||
|
|
||
| public void closeStream(long streamId) { | ||
| synchronized (sorted) { |
| UserIdentifier userIdentifier = fileInfo.getUserIdentifier(); | ||
|
|
||
| // not a range read and the sorted file not generate now | ||
| if (endMapIndex == Integer.MAX_VALUE && !sortedShuffleFiles.containsKey(shuffleKey)) { |
There was a problem hiding this comment.
To check whether the sorted file has been generated, should also check the sorted set
if (endMapIndex == Integer.MAX_VALUE && !sortedShuffleFiles.containsKey(shuffleKey) &&
!sortedShuffleFiles.get(shuffleKey).contains(fileId)) {
return fileInfo;
}
Or else, do not call getSortedFileInfo if endMapIndex is max int.
There was a problem hiding this comment.
improved using Option 2.
| startIndex, | ||
| endIndex) | ||
| // non-range openStream request | ||
| if (endIndex == Int.MaxValue && !fileInfo.addStream(streamId)) { |
There was a problem hiding this comment.
getSortedFileInfo may return the sorted file info even though endIndex == Int.MaxValue. IMO here we can just call original fileInfo's addStream like following:
var fileInfo = getRawFileInfo(shuffleKey, fileName)
fileInfo.getPartitionType match {
case PartitionType.REDUCE =>
val streamId = chunkStreamManager.nextStreamId()
// non-range openStream request
if (endIndex == Int.MaxValue && !fileInfo.addStream(streamId)) {
// This branch is entered when the original unsorted file has been sorted by another
// range's openStream request. retry fetching the sorted FileInfo.
fileInfo = partitionsSorter.getSortedFileInfo(
shuffleKey,
fileName,
fileInfo,
startIndex,
endIndex)
assert(Utils.isSortedPath(fileInfo.getFilePath))
}
There was a problem hiding this comment.
we also need to return the sorted file information for range openStream requests.
|
encountered another problem while running the seems the worker can't exit correctly. I'll investigate further later on. |
Hi, I've seen this several times. IMO, this exception occurs when a fetch handler decodes some RPC messages while the server is ending. As you can see this exception occurs close to the shutdown hook's log. |
…oveOriginalFiles.enabled` ### What changes were proposed in this pull request? As title ### Why are the changes needed? The config key `celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled` has become unnecessary as a result of #1932 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #1999 from cfmcgrady/celeborn-1047. Authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
…ger to close stream for ReusedExchange ### What changes were proposed in this pull request? `OpenStream` should register stream via `ChunkStreamManager`, which is served to obtain disk file to close stream for `ReusedExchange` operator. Follow up #1932. ### Why are the changes needed? `OpenStream` does not register chunk stream for reading local or dfs shuffle. Therefore `LocalPartitionReader` and `DfsPartitionReader` could not obtain the disk file from `ChunkStreamManager` that causes the below `NullPointerException` for closing stream. ``` ERROR [fetch-server-11-11] TransportRequestHandler: Error while invoking handler#receive() on RPC id 4 java.lang.NullPointerException at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:188) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:344) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:137) at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:94) at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96) at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) ``` In summary, `FetchHandler` only closes stream registered via `ChunkStreamManager`. `LocalPartitionReader` and `DfsPartitionReader` should use `ChunkStreamManager#registerStream` to close stream for deleting original unsorted disk file in `ReusedExchange` operator. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `FetchHandlerSuiteJ#testLocalReadSortFileOnceOriginalFileBeDeleted` - `FetchHandlerSuiteJ#testDoNotDeleteOriginalFileWhenNonRangeLocalReadWorkInProgress` - `ReusedExchangeSuite` Closes #2209 from SteNicholas/CELEBORN-1177. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
…ger to close stream for ReusedExchange `OpenStream` should register stream via `ChunkStreamManager`, which is served to obtain disk file to close stream for `ReusedExchange` operator. Follow up #1932. `OpenStream` does not register chunk stream for reading local or dfs shuffle. Therefore `LocalPartitionReader` and `DfsPartitionReader` could not obtain the disk file from `ChunkStreamManager` that causes the below `NullPointerException` for closing stream. ``` ERROR [fetch-server-11-11] TransportRequestHandler: Error while invoking handler#receive() on RPC id 4 java.lang.NullPointerException at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:188) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:344) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:137) at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:94) at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96) at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) ``` In summary, `FetchHandler` only closes stream registered via `ChunkStreamManager`. `LocalPartitionReader` and `DfsPartitionReader` should use `ChunkStreamManager#registerStream` to close stream for deleting original unsorted disk file in `ReusedExchange` operator. No. - `FetchHandlerSuiteJ#testLocalReadSortFileOnceOriginalFileBeDeleted` - `FetchHandlerSuiteJ#testDoNotDeleteOriginalFileWhenNonRangeLocalReadWorkInProgress` - `ReusedExchangeSuite` Closes #2209 from SteNicholas/CELEBORN-1177. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Fu Chen <cfmcgrady@gmail.com> (cherry picked from commit 7bc3497) Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
What changes were proposed in this pull request?
The
ReusedExchangeoperator has the potential to generate different types of fetch requests, including both non-range and range requests. Currently, an issue arises due to the synchronous deletion of the original file by the Celeborn worker upon completion of sorting. This issue leads to the failure of non-range requests following a range request for the same partition.the snippets to reproduce this bug
This PR proposes a solution to address this problem. It introduces an asynchronous thread for the removal of the original file. Once the sorted file is generated for a given partition, this modification ensures that both non-range and range fetch requests will be able to and only fetch the sorted file once it is generated for a given partition.
this activity diagram of
openStreamDoes this PR introduce any user-facing change?
No, only bug fix
How was this patch tested?
UT