Skip to content

Conversation

@Dream95
Copy link
Contributor

@Dream95 Dream95 commented Jan 27, 2026

Motivation

The binary protocol already performs check backlog quota in ServerCnx when creating producers, but the HTTP REST API path was missing this validation.

      .thenAccept(t -> {
            // TODO: Check message backlog and fail if backlog too large.
            if (!t.isPresent()) {
                // Topic not found, and remove from owning partition list.
                publishResult.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic not "
                        + "owned by current broker."));
                TopicName topicName = TopicName.get(topic);
                pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
                        .remove(topicName.getPartitionIndex());

Modifications

  • Added backlog quota checks in publishSingleMessageToPartition method for both destination_storage and message_age quota types

The implementation follows the same pattern used in ServerCnx.handleProducer where backlog quota checks are performed before allowing message production.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as:

  • Existing backlog quota tests should cover this functionality
  • HTTP produce endpoint tests should verify the behavior

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Changes to REST endpoints:

  • The HTTP produce endpoint (/persistent/{tenant}/{namespace}/{topic}/partitions/{partition}) now properly enforces backlog quota limits before publishing messages. When backlog quota is exceeded, the request will fail with an appropriate error, consistent with the binary protocol behavior.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Dream95#4

- Add http produce backlog quota check for both destination_storage and message_age

Signed-off-by: Dream95 <zhou_8621@163.com>
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 27, 2026
Copy link
Contributor

@Denovo1998 Denovo1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it does a backlog quota check for each message, maybe it can be optimized?

  1. In internalPublishMessages(...), first calculate which partition(s) the current request will write to (actually, you have already calculated in round-robin which partition each message goes to).
  2. Perform once for each partition:
  • getTopic(partitionTopic)
  • checkBacklogQuotaExceeded(...destination_storage + message_age)
  1. After all checks pass, then enter the actual publish loop
  • If the check for a certain partition fails: mark all the message results corresponding to that partition as failed (no longer attempt to publish), other partitions can continue.

Comment on lines +288 to +291
topicObj.checkBacklogQuotaExceeded(defaultProducerName,
BacklogQuota.BacklogQuotaType.destination_storage),
topicObj.checkBacklogQuotaExceeded(defaultProducerName,
BacklogQuota.BacklogQuotaType.message_age));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass in message.getProducerName(); here?

if (log.isDebugEnabled()) {
log.debug("Fail to publish single messages to topic {}: {} ",
topicName, e.getCause());
log.debug("Fail to publish single message to topic {}: {}", topic, ex.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can first use FutureUtil.unwrapCompletionException.

.topic(topicName)
.create();
producer.send("backlog-message");
Thread.sleep(2000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it can be replaced with a "poll until quota exceeded (with timeout)" method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants