Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Consumers Set Custom Retry Delay #6448

Closed
liudezhi2098 opened this issue Mar 1, 2020 · 3 comments
Closed

Support Consumers Set Custom Retry Delay #6448

liudezhi2098 opened this issue Mar 1, 2020 · 3 comments
Assignees
Labels
area/client type/feature The PR added a new feature or issue requested a new feature

Comments

@liudezhi2098
Copy link
Contributor

Motivation

For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

Proposed changes

This change can be supported on the client side, need to add a set of interfaces to org.apache.pulsar.client.api.Consumer

void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);

DeadLetterPolicy add retry topic

public class DeadLetterPolicy {

    /**
     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
     */
    private int maxRedeliverCount;

    /**
     * Name of the retry topic where the failing messages will be sent.
     */
    private String retryLetterTopic;

    /**
     * Name of the dead topic where the failing messages will be sent.
     */
    private String deadLetterTopic;

}

org.apache.pulsar.client.impl.ConsumerImpl add a retry producer

  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;

Can specify whether to enable retry when creating a consumer,default unenable

    @Override
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        conf.setRetryEnable(retryEnable);
        return this;
    }
@jiazhai
Copy link
Member

jiazhai commented Mar 3, 2020

Thanks for the great work @liudezhi2098

sijie pushed a commit that referenced this issue Apr 6, 2020
<!--
### Contribution Checklist
  
  - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
    Skip *Issue XYZ* if there is no associated github issue for this pull request.
    Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.

**(The sections below can be removed for hotfixes of typos)**
-->


Master Issue: #6448

### Motivation


For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

### Modifications

This change can be supported on the client side,  need to add a set of interfaces to org.apache.pulsar.client.api.Consumer
```java
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);
```
DeadLetterPolicy add retry topic
```java
public class DeadLetterPolicy {

    /**
     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
     */
    private int maxRedeliverCount;

    /**
     * Name of the retry topic where the failing messages will be sent.
     */
    private String retryLetterTopic;

    /**
     * Name of the dead topic where the failing messages will be sent.
     */
    private String deadLetterTopic;

}

```
org.apache.pulsar.client.impl.ConsumerImpl add a retry producer
```java
  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;
```
Can specify whether to enable retry when creating a consumer,default unenable
```java
    @OverRide
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        conf.setRetryEnable(retryEnable);
        return this;
    }
```
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
<!--
### Contribution Checklist
  
  - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
    Skip *Issue XYZ* if there is no associated github issue for this pull request.
    Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.

**(The sections below can be removed for hotfixes of typos)**
-->


Master Issue: apache#6448

### Motivation


For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

### Modifications

This change can be supported on the client side,  need to add a set of interfaces to org.apache.pulsar.client.api.Consumer
```java
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);
```
DeadLetterPolicy add retry topic
```java
public class DeadLetterPolicy {

    /**
     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
     */
    private int maxRedeliverCount;

    /**
     * Name of the retry topic where the failing messages will be sent.
     */
    private String retryLetterTopic;

    /**
     * Name of the dead topic where the failing messages will be sent.
     */
    private String deadLetterTopic;

}

```
org.apache.pulsar.client.impl.ConsumerImpl add a retry producer
```java
  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;
```
Can specify whether to enable retry when creating a consumer,default unenable
```java
    @OverRide
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        conf.setRetryEnable(retryEnable);
        return this;
    }
```
@devinbost
Copy link
Contributor

devinbost commented Sep 21, 2020

Hi @liudezhi2098 , thanks for your work on this. I'm trying to understand how I'd advise users regarding how to implement this feature.
The docs provide this example:

https://pulsar.apache.org/docs/en/concepts-messaging/#retry-letter-topic

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(maxRedeliveryCount)
                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
                        .build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

However, it's not clear where they'd set the delay between retries. Could you please clarify this?

@devinbost
Copy link
Contributor

I found some examples in your unit tests. It would be good to also update the docs to reflect this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

No branches or pull requests

3 participants