Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Add stop replicator producer logic when start replicator cluster failed #12724

Merged
merged 1 commit into from
Nov 11, 2021

Conversation

Technoboy-
Copy link
Contributor

Motivation

When create persistent topic in BrokerService#createPersistentTopic:

public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                            try {
                                PersistentTopic persistentTopic = isSystemTopic(topic)
                                        ? new SystemTopic(topic, ledger, BrokerService.this)
                                        : new PersistentTopic(topic, ledger, BrokerService.this); // @ Here may throw NamingException
                                ...
                                }).thenRun(() -> {
                                    ...
                                }).exceptionally((ex) -> {
                                    log.warn(
                                            "Replication or dedup check failed."
                                                    + " Removing topic from topics list {}, {}",
                                            topic, ex);
                                    persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
                                        topics.remove(topic, topicFuture);
                                        topicFuture.completeExceptionally(ex);
                                    });

                                    return null;
                                });
                            } catch (NamingException | PulsarServerException e) {
                                log.warn("Failed to create topic {}-{}", topic, e.getMessage());
                                pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
                                topicFuture.completeExceptionally(e);
                            }
                        }

It may throw NamingException when new PersistentTopic. We don't stop replicator producers in the NamingException catch block. This will result in some replicator producers being created and keep in the remote clusters producers list.
Any later retry to add replicator producers fails with the below logs :

Nov 05 13:25:31 pulsar-broker-node-1 pulsar[11435]: 13:25:31.054 [ForkJoinPool.commonPool-worker-29] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.10.0.47:34976] Failed to add producer to topic persistent://SD/operator20018/groupsettings: producerId=17690, org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'pulsar.repl.ctp-pulsar-stage-cluster' is already connected to topic
Nov 05 13:25:33 pulsar-broker-node-1 pulsar[11435]: 13:25:33.351 [ForkJoinPool.commonPool-worker-29] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.10.0.47:34952] Failed to add producer to topic persistent://SD/operator20018/groupsettings: producerId=17694, org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'pulsar.repl.ctp-pulsar-stage-cluster' is already connected to topic
Nov 05 13:25:33 pulsar-broker-node-1 pulsar[11435]: 13:25:33.670 [ForkJoinPool.commonPool-worker-29] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.10.0.47:34970] Failed to add producer to topic persistent://SD/operator20018/groupsettings: producerId=17696,

Documentation

  • [ x ] no-need-doc

@Technoboy- Technoboy- self-assigned this Nov 10, 2021
@codelipenghui codelipenghui added this to the 2.10.0 milestone Nov 11, 2021
@codelipenghui codelipenghui merged commit fc7cba4 into apache:branch-2.8 Nov 11, 2021
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Nov 18, 2021
lhotari pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2021
@lhotari
Copy link
Member

lhotari commented Dec 21, 2021

@Technoboy- It seems that this solution doesn't work in all cases.

I've been debugging the issue and it looks like the changes in #11804 limited the producer replacement to one connection (&& Objects.equals(cnx, other.cnx)). The replacement won't happen if another connection is chosen on the client side for creating the next producer. The changes in #11804 were refactored further in #12846.
I'm not sure about the reason yet, just making some initial guesses.

@Technoboy- Technoboy- deleted the add-stop-rep-producer branch August 10, 2022 05:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.8 Archived: 2.8 is end of life release/2.8.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants