Skip to content

Commit

Permalink
Support retry letter topic (#359)
Browse files Browse the repository at this point in the history
### Motivation

Follow [pulsar#6449](apache/pulsar#6449) to support retry letter topic in go client

### Modifications

- Add `retryRouter` for sending reconsume messages to retry letter topic
- Add `ReconsumeLater(msg Message, delay time.Duration)` to Consumer interface
- Add configureable retry letter topic name in `DLQPolicy`
    ```go
	type DLQPolicy struct {
		// ...
		// Name of the topic where the retry messages will be sent.
		RetryLetterTopic string
	}
	```
    enable it explicitly while creating consumer, default unenable
   
     ```go
    type ConsumerOptions struct {
	    // ...
		// Auto retry send messages to default filled DLQPolicy topics
		RetryEnable bool
	}
    ```
- Add 2 `TestRLQ*`  test cases
  • Loading branch information
wuYin authored Sep 9, 2020
1 parent c078454 commit 3c523ba
Show file tree
Hide file tree
Showing 12 changed files with 521 additions and 34 deletions.
12 changes: 11 additions & 1 deletion pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ type DLQPolicy struct {
MaxDeliveries uint32

// Name of the topic where the failing messages will be sent.
Topic string
DeadLetterTopic string

// Name of the topic where the retry messages will be sent.
RetryLetterTopic string
}

// ConsumerOptions is used to configure and create instances of Consumer
Expand Down Expand Up @@ -107,6 +110,10 @@ type ConsumerOptions struct {
// By default is nil and there's no DLQ
DLQ *DLQPolicy

// Auto retry send messages to default filled DLQPolicy topics
// Default is false
RetryEnable bool

// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage
Expand Down Expand Up @@ -163,6 +170,9 @@ type Consumer interface {
// AckID the consumption of a single message, identified by its MessageID
AckID(MessageID)

// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)

// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
Expand Down
104 changes: 98 additions & 6 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -73,6 +74,7 @@ type consumer struct {
messageCh chan ConsumerMessage

dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
Expand Down Expand Up @@ -108,10 +110,50 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
messageCh = make(chan ConsumerMessage, 10)
}

if options.RetryEnable {
usingTopic := ""
if options.Topic != "" {
usingTopic = options.Topic
} else if len(options.Topics) > 0 {
usingTopic = options.Topics[0]
}
tn, err := internal.ParseTopicName(usingTopic)
if err != nil {
return nil, err
}

retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
}
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
}
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
}
}
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
} else if options.Topic == "" && len(options.Topics) > 0 {
options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
}
}

dlq, err := newDlqRouter(client, options.DLQ)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
if err != nil {
return nil, err
}

// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
Expand All @@ -124,15 +166,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}

return topicSubscribe(client, options, topic, messageCh, dlq)
return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
}

if len(options.Topics) > 1 {
if err := validateTopicNames(options.Topics...); err != nil {
return nil, err
}

return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq)
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}

if options.TopicsPattern != "" {
Expand All @@ -145,14 +187,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq)
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
}

return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
}

func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {

consumer := &consumer{
topic: topic,
Expand All @@ -163,6 +205,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: log.WithField("topic", topic),
consumerName: options.Name,
}
Expand Down Expand Up @@ -306,8 +349,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
messageCh chan ConsumerMessage, dlqRouter *dlqRouter, retryRouter *retryRouter) (Consumer, error) {
c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, retryRouter, false)
if err == nil {
consumersOpened.Inc()
}
Expand Down Expand Up @@ -375,6 +418,54 @@ func (c *consumer) AckID(msgID MessageID) {
c.consumers[mid.partitionIdx].AckID(mid)
}

// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
if delay < 0 {
delay = 0
}
msgID, ok := c.messageID(msg.ID())
if !ok {
return
}
props := make(map[string]string)
for k, v := range msg.Properties() {
props[k] = v
}

reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
}
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)

consumerMsg := ConsumerMessage{
Consumer: c,
Message: &message{
payLoad: msg.Payload(),
properties: props,
msgID: msgID,
},
}
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
c.dlq.Chan() <- consumerMsg
} else {
c.rlq.Chan() <- RetryMessage{
consumerMsg: consumerMsg,
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
Properties: props,
DeliverAfter: delay,
},
}
}
}

func (c *consumer) Nack(msg Message) {
c.NackID(msg.ID())
}
Expand Down Expand Up @@ -411,6 +502,7 @@ func (c *consumer) Close() {
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
consumersPartitions.Sub(float64(len(c.consumers)))
})
Expand Down
16 changes: 14 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,28 @@ type multiTopicConsumer struct {
consumers map[string]Consumer

dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}

log *log.Entry
}

func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
mtc := &multiTopicConsumer{
options: options,
messageCh: messageCh,
consumers: make(map[string]Consumer, len(topics)),
closeCh: make(chan struct{}),
dlq: dlq,
rlq: rlq,
log: &log.Entry{},
consumerName: options.Name,
}

var errs error
for ce := range subscriber(client, topics, options, messageCh, dlq) {
for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -134,6 +136,15 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid.Ack()
}

func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
consumer, ok := c.consumers[msg.Topic()]
if !ok {
c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic())
return
}
consumer.ReconsumeLater(msg, delay)
}

func (c *multiTopicConsumer) Nack(msg Message) {
c.NackID(msg.ID())
}
Expand Down Expand Up @@ -166,6 +177,7 @@ func (c *multiTopicConsumer) Close() {
wg.Wait()
close(c.closeCh)
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
})
}
Expand Down
21 changes: 14 additions & 7 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
type regexConsumer struct {
client *client
dlq *dlqRouter
rlq *retryRouter

options ConsumerOptions

Expand All @@ -64,10 +65,11 @@ type regexConsumer struct {
}

func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
rc := &regexConsumer{
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,

Expand All @@ -90,7 +92,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
}

var errs error
for ce := range subscriber(c, topics, opts, msgCh, dlq) {
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -163,6 +165,10 @@ func (c *regexConsumer) Ack(msg Message) {
c.AckID(msg.ID())
}

func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}

// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := toTrackingMessageID(msgID)
Expand Down Expand Up @@ -215,6 +221,7 @@ func (c *regexConsumer) Close() {
}
wg.Wait()
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
})
}
Expand Down Expand Up @@ -253,7 +260,7 @@ func (c *regexConsumer) monitor() {
}
case topics := <-c.subscribeCh:
if len(topics) > 0 && !c.closed() {
c.subscribe(topics, c.dlq)
c.subscribe(topics, c.dlq, c.rlq)
}
case topics := <-c.unsubscribeCh:
if len(topics) > 0 && !c.closed() {
Expand Down Expand Up @@ -298,12 +305,12 @@ func (c *regexConsumer) knownTopics() []string {
return topics
}

func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter) {
func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
if log.GetLevel() == log.DebugLevel {
c.log.WithField("topics", topics).Debug("subscribe")
}
consumers := make(map[string]Consumer, len(topics))
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq) {
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
if ce.err != nil {
c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -359,7 +366,7 @@ type consumerError struct {
}

func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
dlq *dlqRouter) <-chan consumerError {
dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
consumerErrorCh := make(chan consumerError, len(topics))
var wg sync.WaitGroup
wg.Add(len(topics))
Expand All @@ -371,7 +378,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
Expand Down
6 changes: 4 additions & 2 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
}

dlq, _ := newDlqRouter(c.(*client), nil)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
rlq, _ := newRetryRouter(c.(*client), nil, false)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -202,7 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
}

dlq, _ := newDlqRouter(c.(*client), nil)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
rlq, _ := newRetryRouter(c.(*client), nil, false)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 3c523ba

Please sign in to comment.