Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Unexpected exceptions happened when flushing events #1542

Closed
2 of 3 tasks
rickyma opened this issue Feb 24, 2024 · 2 comments
Closed
2 of 3 tasks

[Bug] Unexpected exceptions happened when flushing events #1542

rickyma opened this issue Feb 24, 2024 · 2 comments

Comments

@rickyma
Copy link
Contributor

rickyma commented Feb 24, 2024

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

As described in #1537, here is the full stack trace:

[2024-02-24 21:01:20.540] [checkResource-0] [WARN] ShuffleTaskManager.preAllocatedBufferCheck - Remove expired preAllocatedBuffer[id=1213266] that required by app: application_1703049085550_4925739_1708775782306
[2024-02-24 21:01:20.540] [expiredAppCleaner-0] [INFO] ShuffleTaskManager.checkResourceStatus - Detect expired appId[application_1703049085550_4925739_1708775782306] according to rss.server.app.expired.withoutHeartbeat
[2024-02-24 21:01:20.547] [clearResourceThread] [INFO] ShuffleTaskManager.removeResources - Start remove resource for appId[application_1703049085550_4925739_1708775782306]
[2024-02-24 21:01:21.078] [LocalFileFlushEventThreadPool-0] [ERROR] DefaultFlushEventHandler.handleEventAndUpdateMetrics - Unexpected exceptions happened due to
java.lang.NullPointerException
at org.apache.uniffle.server.ShuffleTaskInfo.getMaxConcurrencyPerPartitionToWrite(ShuffleTaskInfo.java:109)
at org.apache.uniffle.server.ShuffleFlushManager.getMaxConcurrencyPerPartitionWrite(ShuffleFlushManager.java:198)
at org.apache.uniffle.server.ShuffleFlushManager.processFlushEvent(ShuffleFlushManager.java:149)
at org.apache.uniffle.server.DefaultFlushEventHandler.handleEventAndUpdateMetrics(DefaultFlushEventHandler.java:87)
at org.apache.uniffle.server.DefaultFlushEventHandler.lambda$dispatchEvent$0(DefaultFlushEventHandler.java:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

As you can see, the exception will happen right after the appId is expired and the resources related to the appId are removed.

NPE happend here:

public int getMaxConcurrencyPerPartitionToWrite() {
  return specification.get().getMaxConcurrencyPerPartitionToWrite();
}

I wanna discuss about it. What's the best way to avoid the exception?

Updated:
I've encountered another exception too:

[2024-03-08 17:50:24.075] [epollEventLoopGroup-3-95] [ERROR] ShuffleServerNettyHandler.handleSendShuffleDataRequest - Error happened when shuffleEngine.write for appId[application_1703049085550_5931482_1709891324515], shuffleId[0], partitionId[18632]: null
java.lang.NullPointerException
        at org.apache.uniffle.server.ShuffleTaskManager.getDataDistributionType(ShuffleTaskManager.java:861)
        at org.apache.uniffle.server.ShuffleFlushManager.getDataDistributionType(ShuffleFlushManager.java:270)
        at org.apache.uniffle.server.buffer.ShuffleBufferManager.flushBuffer(ShuffleBufferManager.java:301)
        at org.apache.uniffle.server.buffer.ShuffleBufferManager.flush(ShuffleBufferManager.java:468)
        at org.apache.uniffle.server.buffer.ShuffleBufferManager.flushIfNecessary(ShuffleBufferManager.java:268)
        at org.apache.uniffle.server.buffer.ShuffleBufferManager.cacheShuffleData(ShuffleBufferManager.java:187)
        at org.apache.uniffle.server.ShuffleTaskManager.cacheShuffleData(ShuffleTaskManager.java:302)
        at org.apache.uniffle.server.netty.ShuffleServerNettyHandler.handleSendShuffleDataRequest(ShuffleServerNettyHandler.java:168)
        at org.apache.uniffle.server.netty.ShuffleServerNettyHandler.receive(ShuffleServerNettyHandler.java:78)
        at org.apache.uniffle.common.netty.handle.TransportRequestHandler.handle(TransportRequestHandler.java:62)
        at org.apache.uniffle.common.netty.handle.TransportChannelHandler.channelRead(TransportChannelHandler.java:100)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at org.apache.uniffle.common.netty.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:80)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)

Affects Version(s)

master

Uniffle Server Log Output

No response

Uniffle Engine Log Output

No response

Uniffle Server Configurations

No response

Uniffle Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@rickyma
Copy link
Contributor Author

rickyma commented Mar 6, 2024

Anyone will be welcome to pick this up.

zuston pushed a commit that referenced this issue Mar 18, 2024
…urs (#1574)

### What changes were proposed in this pull request?

In the implementation of the methods `flushBuffer`, `handleEventAndUpdateMetrics`, and `removeBufferByShuffleId`, read-write locks have been added to manage concurrency. This ensures that a `ShuffleBuffer` successfully converted into a `flushEvent` won't be cleaned up again by `removeBufferByShuffleId`, and a `ShuffleBuffer` already cleaned up by `removeBufferByShuffleId` won't be transformed back into a `flushEvent`. This effectively resolves the concurrency issue.

### Why are the changes needed?

Fix #1571 & #1560 & #1542

The key logic of the PR is as follows:

Before this PR:
1. A `ShuffleBuffer` is turned into a `FlushEvent`, and **_its blocks and size are cleared_**
→
2. The `FlushEvent` is added to the flushing queue
→
3. The method `removeBufferByShuffleId` is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, `buffer.getBlocks()` **_is empty and size is 0_**, because of the step 1 above:
```
for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}
```

3.2. `appId` is removed from the `bufferPool`
→
4. The `FlushEvent` is taken out from the queue and encounters an `EventInvalidException` because the `appId` was removed before
→
5. When handling the `EventInvalidException`, nothing is done and the `event.doCleanup()` method **_is not called, causing a memory leak_**.
Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the `processFlushEvent` method, it is possible that the event may become invalid at any time when continuing executing in `processFlushEvent` method, which is why there is #1542. Also, there is #1560.

---

After this PR:
We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an `EventInvalidException` in step 5, we will call the `event.doCleanup()` method to release the memory.

In this way, we can ensure the following things when resources are being cleaned up:
1. `ShuffleBuffers` that have not yet been converted to `FlushEvents` will not be converted in the future, but will be directly cleaned up.
2. `FlushEvents` that have been converted from `ShuffleBuffers` will definitely encounter an `EventInvalidException`, and we will eventually handle this exception correctly, releasing memory.
3. If there is already a `FlushEvent` being processed and it is about to be flushed to disk, the resource cleanup task will wait for all `FlushEvents` related to the `appId` to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
---------

Co-authored-by: leslizhang <leslizhang@tencent.com>
@rickyma
Copy link
Contributor Author

rickyma commented Mar 18, 2024

Done by #1574

@rickyma rickyma closed this as completed Mar 18, 2024
zuston pushed a commit to zuston/incubator-uniffle that referenced this issue May 27, 2024
…n` occurs (apache#1574)

In the implementation of the methods `flushBuffer`, `handleEventAndUpdateMetrics`, and `removeBufferByShuffleId`, read-write locks have been added to manage concurrency. This ensures that a `ShuffleBuffer` successfully converted into a `flushEvent` won't be cleaned up again by `removeBufferByShuffleId`, and a `ShuffleBuffer` already cleaned up by `removeBufferByShuffleId` won't be transformed back into a `flushEvent`. This effectively resolves the concurrency issue.

Fix apache#1571 & apache#1560 & apache#1542

The key logic of the PR is as follows:

Before this PR:
1. A `ShuffleBuffer` is turned into a `FlushEvent`, and **_its blocks and size are cleared_**
→
2. The `FlushEvent` is added to the flushing queue
→
3. The method `removeBufferByShuffleId` is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, `buffer.getBlocks()` **_is empty and size is 0_**, because of the step 1 above:
```
for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}
```

3.2. `appId` is removed from the `bufferPool`
→
4. The `FlushEvent` is taken out from the queue and encounters an `EventInvalidException` because the `appId` was removed before
→
5. When handling the `EventInvalidException`, nothing is done and the `event.doCleanup()` method **_is not called, causing a memory leak_**.
Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the `processFlushEvent` method, it is possible that the event may become invalid at any time when continuing executing in `processFlushEvent` method, which is why there is apache#1542. Also, there is apache#1560.

---

After this PR:
We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an `EventInvalidException` in step 5, we will call the `event.doCleanup()` method to release the memory.

In this way, we can ensure the following things when resources are being cleaned up:
1. `ShuffleBuffers` that have not yet been converted to `FlushEvents` will not be converted in the future, but will be directly cleaned up.
2. `FlushEvents` that have been converted from `ShuffleBuffers` will definitely encounter an `EventInvalidException`, and we will eventually handle this exception correctly, releasing memory.
3. If there is already a `FlushEvent` being processed and it is about to be flushed to disk, the resource cleanup task will wait for all `FlushEvents` related to the `appId` to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

No.

Existing UTs.
---------

Co-authored-by: leslizhang <leslizhang@tencent.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant