Skip to content

[improve][client] Close consumer when topic terminates.#13960

Closed
Technoboy- wants to merge 2 commits intoapache:masterfrom
Technoboy-:process-reach-end-of-tpic
Closed

[improve][client] Close consumer when topic terminates.#13960
Technoboy- wants to merge 2 commits intoapache:masterfrom
Technoboy-:process-reach-end-of-tpic

Conversation

@Technoboy-
Copy link
Contributor

Motivation

When the topic is terminated, and the consumer received 'end-of-topic' command, we should close the consumer.
Because entries in backlog is 0, and no more messages to delivery .

Documentation

  • no-need-doc

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 26, 2022
mattisonchao
mattisonchao previously approved these changes Jan 26, 2022
RobertIndie
RobertIndie previously approved these changes Jan 26, 2022
@congbobo184 congbobo184 dismissed stale reviews from RobertIndie and mattisonchao via 25e1b09 February 16, 2022 02:58
@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@Technoboy- Technoboy- changed the title Close consumer when topic terminates. [improve][client] Close consumer when topic terminates. May 9, 2022
@Technoboy- Technoboy- added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/client release/2.10.1 labels May 9, 2022
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think whether it should be treated as an improvement needs a discussion, so I requested changes here.

image

The REACHED_END_OF_TOPIC could be received not only by topic termination. When a managed cursor doesn't have more entries, maybe temporarily, this request could also be sent by broker. Calling closeAsync here at this case might lead to frequent reconnections.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 9, 2022

The REACHED_END_OF_TOPIC could be received not only by topic termination.

To correct my comment, NoMoreEntriesToReadException could only be triggered by topic termination. So it's true that REACHED_END_OF_TOPIC could be received only by topic termination.

But I still have some concerns about this behavior change. It means, when a topic is terminated, the subscribed consumers should all call the public closeAsync API implicitly. For consumers with MessageListener enabled, changes of this PR are equivalent to following code:

    default void reachedEndOfTopic(Consumer<T> consumer) {
        consumer.closeAsync();
    }

which changes the original behavior.

In addition, for programmers new to Pulsar, when they read the setTerminated method, it could be confused to see a public API is called internally.

@RobertIndie @mattisonchao @Demogorgon314 @liudezhi2098 What's your opinions since I see you approved this PR?

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 9, 2022

In addition, this also makes consumer not able to seek to an older position after topic terminates.

        try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
            CountDownLatch latch = new CountDownLatch(1);
            Consumer<byte[]> consumer = client.newConsumer()
                    .topic("my-topic")
                    .subscriptionName("sub")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    .messageListener(new MessageListener<byte[]>() {
                        @Override
                        public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                            System.out.println("Received " + new String(msg.getValue()));
                            consumer.acknowledgeAsync(msg);
                        }

                        @Override
                        public void reachedEndOfTopic(Consumer<byte[]> consumer) {
                            System.out.println("consumer closed");
                            latch.countDown();
                        }
                    })
                    .subscribe();
            latch.await();
            try {
                consumer.seek(MessageId.earliest);
                System.out.println("seek done");
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }

Run the application above, the run following commands.

./bin/pulsar-client produce -m hello my-topic
./bin/pulsar-admin topics terminate my-topic 

Before this patch, the output will be:

Received hello
consumer closed
seek done

After this patch, the output will be:

Received hello
consumer closed
org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: The consumer d9dc7 was already closed when seeking the subscription sub of the topic persistent://public/default/my-topic to the message -1:-1:-1

It's only a case for seek. I'm not sure whether the methods added in future could still fail automatically by this behavior change. Whatever, it will make users confused.

@Technoboy-
Copy link
Contributor Author

In addition, this also makes consumer not able to seek to an older position after topic terminates.

        try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
            CountDownLatch latch = new CountDownLatch(1);
            Consumer<byte[]> consumer = client.newConsumer()
                    .topic("my-topic")
                    .subscriptionName("sub")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    .messageListener(new MessageListener<byte[]>() {
                        @Override
                        public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                            System.out.println("Received " + new String(msg.getValue()));
                            consumer.acknowledgeAsync(msg);
                        }

                        @Override
                        public void reachedEndOfTopic(Consumer<byte[]> consumer) {
                            System.out.println("consumer closed");
                            latch.countDown();
                        }
                    })
                    .subscribe();
            latch.await();
            try {
                consumer.seek(MessageId.earliest);
                System.out.println("seek done");
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }

Run the application above, the run following commands.

./bin/pulsar-client produce -m hello my-topic
./bin/pulsar-admin topics terminate my-topic 

Before this patch, the output will be:

Received hello
consumer closed
seek done

After this patch, the output will be:

Received hello
consumer closed
org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: The consumer d9dc7 was already closed when seeking the subscription sub of the topic persistent://public/default/my-topic to the message -1:-1:-1

It's only a case for seek. I'm not sure whether the methods added in future could still fail automatically by this behavior change. Whatever, it will make users confused.

yes, right. this is a case. even if the topic is terminated, the subscription can be created.
So I decide to close this patch.

@Technoboy- Technoboy- closed this May 9, 2022
@Technoboy- Technoboy- deleted the process-reach-end-of-tpic branch August 10, 2022 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/client doc-not-needed Your PR changes do not impact docs release/2.10.1 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants