Skip to content

[Bug] [client] transactional consumer do not work though corresponding TP has been recovered. #19148

@thetumbled

Description

@thetumbled

Search before asking

  • I searched in the issues and found nothing similar.

Version

master branch.

Minimal reproduce step

# create topic with 10 partition
bin/pulsar-admin topics create-partitioned-topic  test/tb5/testTxn10 --partitions 10

# start perf process
bin/pulsar-perf produce -r 2048000 -bm 10 -txn -nmt 1000 persistent://test/tb5/testTxn10
bin/pulsar-perf consume -r 2048000 -txn -nmt 1500  persistent://test/tb5/testTxn10

# Restart broker, which will trigger topic unload and reload, TP, TB, TC recovery.
bin/pulsar-daemon restart broker

What did you expect to see?

we expect to see that transactional producer and consumer work like before.

What did you see instead?

some partitions of topic persistent://test/tb5/testTxn10 do not work.
There are only two partitions work normally.
image

The client reports the following error. The partition without traffic will have the corresponding error message.

2023-01-06T14:31:03,907+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x52f04075, L:/172.24.25.42:49158 - R:cluster2-nn0.bigo.baina/172.24.25.41:6650] Received error from server: **Exclusive consumer is already connected**
2023-01-06T14:31:03,907+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://test/tb5/testTxn10-**partition-2**][sub] **Failed to subscribe to topic** on cluster2-nn0.bigo.baina/172.24.25.41:6650

2023-01-06T14:31:33,177+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x951d9be7, L:/172.24.25.42:49171 - R:cluster2-nn0.bigo.baina/172.24.25.41:6650] Received error from server: Exclusive consumer is already connected
2023-01-06T14:31:33,177+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://test/tb5/testTxn10-partition-6][sub] Failed to subscribe to topic on cluster2-nn0.bigo.baina/172.24.25.41:6650

And i find that all TP corresponding to 10 partitions of topics have been recovered.

python3 calculateTbpRecoverTime.py TP
TP for topic:persistent://test/tb5/testTxn10-partition-0 sub:sub recover time in milliseconds: 431442
TP for topic:persistent://test/tb5/testTxn10-partition-1 sub:sub recover time in milliseconds: 275965
TP for topic:persistent://test/tb5/testTxn10-partition-2 sub:sub recover time in milliseconds: 302184
TP for topic:persistent://test/tb5/testTxn10-partition-3 sub:sub recover time in milliseconds: 282678
TP for topic:persistent://test/tb5/testTxn10-partition-4 sub:sub recover time in milliseconds: 336789
TP for topic:persistent://test/tb5/testTxn10-partition-5 sub:sub recover time in milliseconds: 341019
TP for topic:persistent://test/tb5/testTxn10-partition-6 sub:sub recover time in milliseconds: 279376
TP for topic:persistent://test/tb5/testTxn10-partition-7 sub:sub recover time in milliseconds: 431452
TP for topic:persistent://test/tb5/testTxn10-partition-8 sub:sub recover time in milliseconds: 282711
TP for topic:persistent://test/tb5/testTxn10-partition-9 sub:sub recover time in milliseconds: 250019

Query the information of one topic without traffic.

bin/pulsar-admin topics stats persistent://test/tb5/testTxn10-partition-0
{
"msgRateIn" : 894.0452581776605,
"msgThroughputIn" : 931086.1450541553,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 982555920,
"msgInCounter" : 943472,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1041.4306619689428,
"msgChunkPublished" : false,
"storageSize" : 13418100120,
"backlogSize" : 13417146498,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 1254,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 894.0452581776605,
"msgThroughputIn" : 931086.1450541553,
"averageMsgSize" : 1041.4306619689428,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/172.24.25.42:48102",
"producerName" : "pulsar-cluster-fwz-1-46-10",
"connectedSince" : "2023-01-06T14:18:20.440183522+08:00",
"clientVersion" : "2.9.3"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 1299568,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 1299568,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "46bbb",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
**"consumerName" : "46bbb",** 
"availablePermits" : 0,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"metadata" : { },
 **"address" : "/172.24.25.42:48441",
"connectedSince" : "2023-01-06T14:18:20.999288614+08:00",** 
"clientVersion" : "2.9.3",
"lastAckedTime" : "1970-01-01T08:00:00+08:00",
"lastConsumedTime" : "1970-01-01T08:00:00+08:00"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedTrackerMemoryUsage" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "cluster2-nn0.bigo.baina:8081"
}

It is found that the subscription sub of this partition has been connected with a consumer (called 46bbb), but it has no traffic. The pressure testing tool is trying to reconnect and create a new consumer with subscription sub. However, since the sub is exclusive, and there is an inactive consumer 46bbb, which causes that the new consumer cannot be created successfully.
It can be seen from the connection time that this non working consumer is created after broker restarting, but the problem is why it does not work?

As time goes by, partitions that cannot work before begin to have consuming traffic.
image
Query partition information again.

bin/pulsar-admin topics stats persistent://test/tb5/testTxn10-partition-0
{
"msgRateIn" : 899.7183507594756,
"msgThroughputIn" : 938068.0225423912,
"msgRateOut" : 2017.087077147803,
"msgThroughputOut" : 2100638.74889055,
"bytesInCounter" : 2524424057,
"msgInCounter" : 2423093,
"bytesOutCounter" : 340584925,
"msgOutCounter" : 327056,
"averageMsgSize" : 1042.624085360206,
"msgChunkPublished" : false,
"storageSize" : 14960070221,
"backlogSize" : 14959116599,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 3012,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 899.7183507594756,
"msgThroughputIn" : 938068.0225423912,
"averageMsgSize" : 1042.624085360206,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/172.24.25.42:48102",
"producerName" : "pulsar-cluster-fwz-1-46-10",
"connectedSince" : "2023-01-06T14:18:20.440183522+08:00",
"clientVersion" : "2.9.3"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub" : {
"msgRateOut" : 2017.087077147803,
"msgThroughputOut" : 2100638.74889055,
"bytesOutCounter" : 340584925,
"msgOutCounter" : 327056,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 2017.1037374578968,
"chunkedMessageRate" : 0,
"msgBacklog" : 1449916,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 1449916,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "46bbb",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1672988583828,
"lastConsumedTimestamp" : 1672988585368,
"lastAckedTimestamp" : 1672988583932,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 2017.087077147803,
"msgThroughputOut" : 2100638.74889055,
"bytesOutCounter" : 340584925,
"msgOutCounter" : 327056,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 2017.1037374578968,
"chunkedMessageRate" : 0.0,
**"consumerName" : "46bbb",**
"availablePermits" : -56,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 10,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1672988583932,
"lastConsumedTimestamp" : 1672988585368,
"lastConsumedFlowTimestamp" : 1672988583828,
"metadata" : { },
**"address" : "/172.24.25.42:48441",
"connectedSince" : "2023-01-06T14:18:20.999288614+08:00",**
"clientVersion" : "2.9.3",
"lastAckedTime" : "2023-01-06T15:03:03.932+08:00",
"lastConsumedTime" : "2023-01-06T15:03:05.368+08:00"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedTrackerMemoryUsage" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "cluster2-nn0.bigo.baina:8081"
}

It is found that the current working consumer is the previous non working consumer!

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Staletype/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions