Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentModificationException occurs when failPendingMessages #11783

Closed
shoothzj opened this issue Aug 26, 2021 · 7 comments · Fixed by #11884
Closed

ConcurrentModificationException occurs when failPendingMessages #11783

shoothzj opened this issue Aug 26, 2021 · 7 comments · Fixed by #11884
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@shoothzj
Copy link
Member

Describe the bug

image

To Reproduce

It's hard to reproduce.

Expected behavior

Keep the list operation safely.

Additional context
It occurs when restarting one of brokers(there were 2 running brokers), first client recevied DisConnected event, and then ConcurrentModificationException and some recycled already exception in timer task's OpSendMsg

@shoothzj
Copy link
Member Author

I just figure out why, because the client code retry to send messages when exception occurred to avoid network "Jitter"(Translate on google). the sample code are like this

 String topic = "test";
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("http://127.0.0.1:8080")
                .build();
        ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).enableBatching(true);
        Producer<String> producer = producerBuilder.topic(topic).create();

        final TypedMessageBuilder<String> stringTypedMessageBuilder = producer.newMessage();
        final TypedMessageBuilder<String> value = stringTypedMessageBuilder.key("1").value("2");
        final CompletableFuture<MessageId> completableFuture = value.sendAsync();
        completableFuture.exceptionally(throwable -> {
            producer.sendAsync("xxxx");
            return null;
        });

The exceptionally is called from ProducerImpl.failPendingMessages's forEachLoop, and sendAsync will add messages to pendingMessages, then the error ocurred.

I have a question. Should we allow to retry on exceptionally callback or prohibit usage like that ?

@315157973 @eolivelli @merlimat , thanks

@315157973
Copy link
Contributor

In my opinion, we can only use interfaces to restrict the use of methods, verbal conventions are not a good way

@eolivelli
Copy link
Contributor

Cannot prevent the user to call sendAsync that way.
We have to deal with this case.
Now that you have a reproducer it will be easier to create a test case that reproduces the issue.
good catch !

@lhotari
Copy link
Member

lhotari commented Sep 1, 2021

Good catch @shoothzj !

I have a question. Should we allow to retry on exceptionally callback or prohibit usage like that ?

I think that this is a bug that needs to be fixed. I took a quick look and I have a relatively simple way to solve it.

@lhotari
Copy link
Member

lhotari commented Sep 1, 2021

I created #11884 as a draft PR to fix the issue. I don't have a way to reproduce the problem in a test case and that's one of the problems.

@315157973
Copy link
Contributor

Is there any progress here? Why not just set the queue to concurrent?

@lhotari
Copy link
Member

lhotari commented Sep 7, 2021

Is there any progress here? Why not just set the queue to concurrent?

there's progress in #11884, it's currently in draft.

@315157973 I replied in #11884 (comment) . Does that answer your question?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
4 participants