Skip to content

blocked consumers on partitioned topics with delayed messages and Shared subscription #10497

@yangou

Description

@yangou

Describe the bug
Delayed messages won't get delivered on partitioned topics. Happens with both the built-in command-line consumer and golang consumer.

To Reproduce
Steps to reproduce the behavior:

  1. Create a partitioned topic, set the namespace policies as below:
bundles:
  numBundles: 16                                                     # bundles serve the purpose of topic load balancing among brokers
backlog_quota_map:
  destination_storage:
    limit: 1099511627776                                             # 1TB, maximal backlog size of unack'ed messages
    policy: producer_request_hold                                    # keep the producer waiting/timeout when backlog reaches limit
deduplicationEnabled: false
autoTopicCreationOverride:
  allowAutoTopicCreation: false
autoSubscriptionCreationOverride:
  allowAutoSubscriptionCreation: false
message_ttl_in_seconds: 0                                            # don't expire unconsumed messages
subscription_expiration_time_minutes: 0                              # don't expire subscription once inactive(no connections, no consumptions, etc)
retention_policies:
  retentionTimeInMinutes: 0                                          # don't retent the messages around once they are consumed
  retentionSizeInMB: 0                                               # save as above
delayed_delivery_policies:
  active: true                                                       # enable delayed delivery in now namespace
  tickTime: 1000                                                     # tick time for delayed delivery
inactive_topic_policies:
  deleteWhileInactive: false                                         # disable auto-deletion on inactive topics
  1. Create the subscription manually with Latest position and Shared mode
  2. Launch the command line consumer for 1000 messages
  3. Try a couple of time, the command line consumer would stuck waiting for incoming messages.
  4. Check partitioned-stats with admin CLI you could potentially get below:
  5. subscription scheduler was from golang client, while subscription test was with CLI consumer. Restarting consumers would trigger pulsar broker to flush the messages
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 349869,
  "msgInCounter" : 2906,
  "bytesOutCounter" : 316008,
  "msgOutCounter" : 2618,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 349869,
  "backlogSize" : 41503,
  "offloadedStorageSize" : 0,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0
  } ],
  "subscriptions" : {
    "scheduler" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 216457,
      "msgOutCounter" : 1794,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 134,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 134,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 216457,
        "msgOutCounter" : 1794,
        "msgRateRedeliver" : 0.0,
        "chuckedMessageRate" : 0.0,
        "availablePermits" : 6256,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 29,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 2824
    },
    "test" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 99551,
      "msgOutCounter" : 824,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 134,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 134,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 99551,
        "msgOutCounter" : 824,
        "msgRateRedeliver" : 0.0,
        "chuckedMessageRate" : 0.0,
        "availablePermits" : 63176,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 29,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 2670
    }
  },
  "replication" : { },
  "nonContiguousDeletedMessagesRanges" : 58,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 5494,
  "metadata" : {
    "partitions" : 64
  },
  "partitions" : { }
}

Expected behavior
Messages should be cleared out with restart of consumers

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions