Skip to content

PIP 106: Negative acknowledgment backoff

lipenghui edited this page Nov 3, 2021 · 1 revision

Motivation

Apache Pulsar supports the at-least-once message delivery semantic which can tolerate the consumer failure such as the consumer write the data to the database but the database might offline for a while, call an external HTTP server but the HTTP server is not available or maybe the parts of the consumer fault(can’t connect to the database or HTTP server).

In general, the consumer is not able to process the message successfully, what we can do for the above case is we can redeliver the message after processing the message failure so that the message can be redelivered to other consumers(for the Shared subscription). This is a frequently used example:

Message msg = consumer.receive();

try {
      process(msg);
	consumer.acknowledge(msg);
} catch (Exception e) {
      consumer.negativeAcknowledge(msg);
}

But you might don’t want to redeliver the message immediately, give non-working services(HTTP server, Database) some time to recover to avoid the extra overhead caused by too frequent retries. Currently, we can specify a delay for the message redelivery by the negative acknowledgment.

client.newConsumer()
    ....
    .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
    .subscribe();

But this is not flexible enough, so the proposal is to introduce a redelivery backoff mechanism which we can achieve redelivery with different delays according to the number of times the message is retried such as 1s, 2s, 4s, 8s, 16s in the next 5 times message redelivery.

-> reconsumeLater

Approach

The approach is to introduce a NegativeAckRedeliveryBackoff at the client-side, users can specify a NegativeAckRedeliveryBackoff for a consumer. And the client will provide an implementation NegativeAckRedeliveryExponentialBackoff.

The NegativeAckBackoff cannot be used with redelivery delay together, and the default redelivery delay will not change.

Users are also able to implement a specific NegativeAckRedeliveryBackoff, For some frequently used backoff implementations, we should also support it in pulsar clients to provide users with an out-of-the-box experience.

Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the NegativeAckRedeliveryBackoff, which means the message might get redelivered earlier than the delay time from the backoff.

API changes

The new NegativeAckBackoff interface

interface NegativeAckBackoff {

	long next(int redeliveryCount);

}

A new method for building the consumer

client.newConsumer()
    ....
    .negativeAckRedeliveryBackoff(...)
    .subscribe();

Notice: the NegativeAckRedeliveryBackoff will not work with consumer.negativeAcknowledge(MessageId messageId) because we are not able to get the redelivery count from the message ID.

The consumer configuration also can be load from a configuration file, so we should also support specify the NegativeAckRedeliveryBackoff when load consumer configuration from config file. New method will be added in the ConsumerBuilder()

ConsumerBuilder<T> negativeAckRedeliveryBackoff(String className, String params);

Compatibility

The proposal will not introduce any compatibility issues.

Tests Plan

Unit tests & integration tests

Clone this wiki locally