Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumers can't keep on consuming messages while Pulsar Broker restarting even the consumer has been reconnected to the broker #1180

Closed
panszobe opened this issue Feb 23, 2024 · 0 comments · Fixed by #1181
Assignees

Comments

@panszobe
Copy link
Contributor

panszobe commented Feb 23, 2024

Expected behavior

While Pulsar Broker restarting, consumers which have reconnected to broker should keep on consuming messages by sending the flow requests to Pulsar Server. And the Pulsar Broker should keep on pushing messages to Client.

Actual behavior

We discovered that our consumer would not consume new messages after reconnected to the broker, the monitoring metrics graph as below:

image
Server Side: Bytes Out per Subscription (bytes/s) (group by partitioned-topic)

image
Client Side: consumed rows/s

Since 14:40, the output decrease to 0.

We did a case check which indicated that Client could not keep on sending the flow requests to Server, so Server would not push messages to Client because permits < 0.
The log of Server as below:

14:36:42.142 [pulsar-io-32-25] INFO  org.apache.pulsar.broker.service.ServerCnx - [/xxxx:55102] Subscribing on topic persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin
14:36:42.206 [pulsar-io-32-25] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin-Consumer{subscription=PersistentSubscription{topic=persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2, name=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin}, consumerId=3, consumerName=vyvwu, address=/xxxx:55102}] Trigger new read after receiving flow control message with permits 1000 after adding 1000 permits
14:36:42.219 [BookKeeperClientWorker-OrderedExecutor-8-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin] Distributing 1 messages to 1 consumers
14:36:42.219 [BookKeeperClientWorker-OrderedExecutor-8-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin] Added -(2346 minus 0) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in PersistentDispatcherMultipleConsumers
14:36:42.219 [pulsar-io-32-40] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin] Consumer buffer is full, pause reading
14:36:42.490 [pulsar-io-32-25] DEBUG org.apache.pulsar.broker.service.Consumer - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2-PersistentSubscription{topic=persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2, name=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin}] Added more flow control message permits 500 (old was: -1346), blocked = false
14:36:42.493 [pulsar-io-32-25] DEBUG org.apache.pulsar.broker.service.Consumer - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2-PersistentSubscription{topic=persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2, name=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin}] Added more flow control message permits 500 (old was: -846), blocked = false 
14:36:42.498 [pulsar-io-32-25] DEBUG org.apache.pulsar.broker.service.Consumer - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2-PersistentSubscription{topic=persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2, name=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin}] Added more flow control message permits 500 (old was: -346), blocked = false 
14:36:42.501 [pulsar-io-32-25] DEBUG org.apache.pulsar.broker.service.Consumer - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2-PersistentSubscription{topic=persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2, name=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin}] Added more flow control message permits 500 (old was: 154), blocked = false 
14:36:42.524 [BookKeeperClientWorker-OrderedExecutor-8-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin] Distributing 42 messages to 1 consumers
14:36:42.524 [BookKeeperClientWorker-OrderedExecutor-8-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin] Added -(2223 minus 0) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in PersistentDispatcherMultipleConsumers
14:36:42.535 [pulsar-io-32-25] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin-Consumer{subscription=PersistentSubscription{topic=persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2, name=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin}, consumerId=3, consumerName=vyvwu, address=/xxxx:55102}] Trigger new read after receiving flow control message with permits -1569 after adding 0 permits
14:36:42.524 [pulsar-io-32-29] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2 / ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin] Consumer buffer is full, pause reading

The log of Clinet as below(some debugging infomation added):

time="2024-02-23T14:36:42+08:00" level=info msg="Reconnecting to broker in 113.752291ms" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="Connected consumer" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="Reconnected consumer to broker" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="dispatcher requesting initial permits=1000, message stats: current total entries=0, received messages=0, duplicated messages=0, acked messages=0, discarded messages=0, total skipped messages=0" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="requesting more permits=500 available=500, message stats: current total entries=1, received messages=500, duplicated messages=0, acked messages=0, discarded messages=0, total skipped messages=0" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="requesting more permits=500 available=500, message stats: current total entries=1, received messages=1000, duplicated messages=0, acked messages=0, discarded messages=0, total skipped messages=0" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="requesting more permits=500 available=500, message stats: current total entries=1, received messages=1500, duplicated messages=0, acked messages=0, discarded messages=0, total skipped messages=0" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:36:42+08:00" level=info msg="requesting more permits=500 available=500, message stats: current total entries=1, received messages=2000, duplicated messages=0, acked messages=0, discarded messages=0, total skipped messages=0" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:38:19+08:00" level=info msg="message stats: current total entries=2, received messages=4569, duplicated messages=2223, acked messages=0, discarded messages=0, total skipped messages=2223" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:40:19+08:00" level=info msg="message stats: current total entries=2, received messages=4569, duplicated messages=2223, acked messages=0, discarded messages=0, total skipped messages=2223" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"
time="2024-02-23T14:42:19+08:00" level=info msg="message stats: current total entries=2, received messages=4569, duplicated messages=2223, acked messages=0, discarded messages=0, total skipped messages=2223" consumerID=3 name=vyvwu subscription=ck_sinker_preonline_sinker_current_version_test_consume_for_ck_sinker_pulsar_v_origin topic="persistent://test/normal/test_consume_duplicated_for_ck_sinker-partition-2"

Client sent 5 flow requests to Server and Server handled 5 times as excepted, but there are 2223 messages duplicated so that Client will drop these messages in function MessageReceived() which will not increase the avalialePermits when skipping some messages.
As a result, avalialePermits of such partitionConsumer not reached to 500(maxQueueSize/2, use default maxQueueSize=1000), current avaliablePermits = (4569 - 2223) % 500 = 346.

Steps to reproduce

Producers: using batch by setting DisableBatching=false, MaxPendingMessages = 100000, BatchingMaxSize = 1073741824
Consumers: setting EnableBatchIndexAcknowledgment = true

When batchSize is much more greater, the effects of reproduction are more clear.

Probable Bugs

if ackSet != nil && !ackSet.Test(uint(i)) {
pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i)
continue
}

if pc.messageShouldBeDiscarded(trackingMsgID) {
pc.AckID(trackingMsgID)
continue
}

if pc.ackGroupingTracker.isDuplicate(msgID) {
continue
}

These will not increase the availablePermits.

RobertIndie pushed a commit that referenced this issue Feb 23, 2024
Fixes #1180 

### Motivation
In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent.
---------

Co-authored-by: panjinjun <1619-panjinjun@users.noreply.git.sysop.bigo.sg>
RobertIndie pushed a commit that referenced this issue Feb 29, 2024
Fixes #1180

### Motivation
In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent.
---------

Co-authored-by: panjinjun <1619-panjinjun@users.noreply.git.sysop.bigo.sg>
(cherry picked from commit 5d25827)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant