Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client] Fix ConcurrentModificationException in sendAsync #11884

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Sep 1, 2021

Fixes #11783

Motivation

See #11783 for details.

Modifications

Create a subclass for ArrayDeque that can postpone the adding of items that are added during a forEach iteration. To support all possible cases, this checks for nested calls too although that might not happen in this case.
The items added during a forEach loop will be added after the loop completes.

Todo

Find a way to create a failing test for the current issue.

@lhotari
Copy link
Member Author

lhotari commented Sep 1, 2021

@merlimat @codelipenghui does this PR make sense? any recommendation how to add tests?

@lhotari lhotari force-pushed the lh-fix-concurrent-modification-exception-in-sendAsync branch from 703b838 to 72e54d0 Compare September 1, 2021 20:45
@shoothzj
Copy link
Member

shoothzj commented Sep 2, 2021

@merlimat @codelipenghui does this PR make sense? any recommendation how to add tests?

Can we do that?

  1. init producer
  2. close broker
  3. send messages
  4. start broker
  5. wait for timeout, the timeout will call failPendingMessages too

and add msgs to pendingMessages does not check connect stats

Copy link
Contributor

@315157973 315157973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't just guarantee the thread safety of the queue, so we can use a Concurrent queue. The reason why the ProducerImpl object was locked before is because there are other operations that must be guaranteed to be atomic, for example: ProducerImpl#ackReceived, we need to peek first

@lhotari
Copy link
Member Author

lhotari commented Sep 2, 2021

We can't just guarantee the thread safety of the queue, so we can use a Concurrent queue. The reason why the ProducerImpl object was locked before is because there are other operations that must be guaranteed to be atomic, for example: ProducerImpl#ackReceived, we need to peek first

In this case it doesn't seem to be about thread safety and there aren't multiple threads. It's about adding an item to a queue while the same thread is in the forEach loop. That fails with a ConcurrentModificationException.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution looks a little bit hacky as it is fixing some specific case and sequence of method calls.

in order to fix the issue, isn't it better to handle correctly concurrent access to the pendingMessagesQueue?

if we want to go down this way I believe that at least we have to:

  • create a new interface QueueThatCanBeAddedWhileExecutingForEachXX...
  • create a specific subclass
  • use QueueThatCanBeAddedWhileExecutingForEachXX and not Queue

otherwise it may be dangerous to modify the code that accesses the pendingMessagesQueue, as access patterns may change in the future

@shoothzj
Copy link
Member

shoothzj commented Sep 2, 2021

This solution looks a little bit hacky as it is fixing some specific case and sequence of method calls.

in order to fix the issue, isn't it better to handle correctly concurrent access to the pendingMessagesQueue?

if we want to go down this way I believe that at least we have to:

  • create a new interface QueueThatCanBeAddedWhileExecutingForEachXX...
  • create a specific subclass
  • use QueueThatCanBeAddedWhileExecutingForEachXX and not Queue

otherwise it may be dangerous to modify the code that accesses the pendingMessagesQueue, as access patterns may change in the future

agree, QueueThatCanBeAddedWhileExecutingForEachXX are more clearness

@Anonymitaet
Copy link
Member

Thanks for your contribution. For this PR, do we need to update docs?

(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@lhotari lhotari added the doc-not-needed Your PR changes do not impact docs label Sep 3, 2021
@lhotari lhotari force-pushed the lh-fix-concurrent-modification-exception-in-sendAsync branch 3 times, most recently from 29838af to 511c340 Compare September 7, 2021 07:46
@lhotari
Copy link
Member Author

lhotari commented Sep 7, 2021

Several comments have been about using a concurrent queue implementation. I don't think that is necessary as long as the access of the queue is properly synchronized. The solution in this PR demonstrates in a unit test how the ConcurrentModificationException can happen in a single thread. When using an ordinary ArrayDeque the test would fail.

The root cause of #11783 is that while the current thread is executing the action in the forEach block, the callback code might try to add a new OpSendMsg entry in the calling thread.

Some locations:

https://github.com/apache/pulsar/blob/d86db3f4ec4fb6bd04216a123cde2fee5c43f9d9/pulsar-
client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L886-L891

pendingMessages.forEach(op -> {
releaseCount.addAndGet(batchMessagingEnabled ? op.numMessagesInBatch: 1);
try {
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
ex.setSequenceId(op.sequenceId);
// if message is chunked then call callback only on last chunk
if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
op.sendComplete(ex);
}
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
op.sequenceId, t);
}
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
});

I guess it's a valid case that after failing all pending messages the client code is allowed to access the producer in the callback to send more messages. (there are more details in #11783)

@merlimat @codelipenghui Does the approach in this PR make sense, to postpone elements added while the forEach loop is in progress?

@lhotari lhotari force-pushed the lh-fix-concurrent-modification-exception-in-sendAsync branch from 511c340 to 71b59ad Compare September 7, 2021 07:55
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@lhotari lhotari marked this pull request as ready for review September 7, 2021 09:32
@eolivelli
Copy link
Contributor

@merlimat @codelipenghui @315157973
can you please take a look ?

@eolivelli eolivelli added the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Sep 20, 2021
@315157973
Copy link
Contributor

I don't know if it is a good way, waiting for others to CR. @codelipenghui @hangc0276 @merlimat

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good. I just left a couple of comments.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@eolivelli eolivelli merged commit a1c1028 into apache:master Sep 21, 2021
codelipenghui pushed a commit that referenced this pull request Sep 24, 2021
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Sep 24, 2021
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Oct 27, 2021
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Feb 11, 2022
### Background

The issue is a race condition introduced by this PR apache#11884 which introduced a struct to maintain pending messages, but it has a race condition for `foreach()`  and `peek` , the issue is during the `foreach` process, if the new messages sent to the broker and received the receipt from the broker, then the producer will peek a null message from the `OpSendMsgQueue` , I have added some logs to confirm the issue.

Here are the logs which can explain the bug:

```
2022-02-11T12:40:00,571+0800 [pulsar-timer-5-1] ERROR org.apache.pulsar.client.impl.ProducerImpl - For each OpSendMsgQueue, 0
2022-02-11T12:40:00,572+0800 [pulsar-timer-5-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [public/default/t_topic] [standalone-0-3] Pending messages: 1 --- Publish throughput: 0.99 msg/s --- 0.00 Mbit/s --- Latency: med: 0.000 ms - 95pct: 0.000 ms - 99pct: 0.000 ms - 99.9pct: 0.000 ms - max: -∞ ms --- Ack received rate: 0.00 ack/s --- Failed messages: 0
2022-02-11T12:40:01,564+0800 [pulsar-client-io-1-1] ERROR org.apache.pulsar.client.impl.ProducerImpl - Add message to OpSendMsgQueue.postponedOpSendMgs, 1, 182801
2022-02-11T12:40:01,566+0800 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [public/default/s_topic] [standalone-0-6] Got ack for timed out msg 182801 - 182900
2022-02-11T12:40:01,573+0800 [pulsar-timer-5-1] ERROR org.apache.pulsar.client.impl.ProducerImpl - For each OpSendMsgQueue, 0
2022-02-11T12:40:01,573+0800 [pulsar-timer-5-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - Put the opsend back to deque of sequenceID 182801
```

From the logs, you can see a message with sequence ID 182801 add the `OpSendMsgQueue` first, but after the producer received the receipt, the log shows Got ack for timed out msg which means got null when peeking messages, and after, the message add back to the internal queue, but the producer side is blocked at this time.

### Modification

1. Avoid using foreach to iterate the pending ops, use iterator to instead
2. Keep using OpSendMsgQueue to avoid expose `foreach` method
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client cherry-picked/branch-2.8 Archived: 2.8 is end of life doc-not-needed Your PR changes do not impact docs release/blocker Indicate the PR or issue that should block the release until it gets resolved release/2.8.2 release/2.9.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ConcurrentModificationException occurs when failPendingMessages
7 participants