Skip to content

Commit

Permalink
Allow CustomProperties when sending message for retry (#916)
Browse files Browse the repository at this point in the history
* Allow CustomProperties when sending message for retry

* Lint issue fixed

* TestRLQWithCustomProperties test case added

Signed-off-by: Nitin Goyal <nitingoyal.dev@gmail.com>
  • Loading branch information
ngoyal16 committed Jan 3, 2023
1 parent 9fc96d4 commit 1f3747e
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ type Consumer interface {
// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)

// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, delay time.Duration)

// Nack acknowledges the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
Expand Down
10 changes: 10 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,12 @@ func (c *consumer) AckIDCumulative(msgID MessageID) error {

// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}

// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
if delay < 0 {
delay = 0
}
Expand All @@ -532,6 +538,10 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
props[k] = v
}

for k, v := range customProperties {
props[k] = v
}

reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
Expand Down
7 changes: 6 additions & 1 deletion pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func (c *multiTopicConsumer) AckIDCumulative(msgID MessageID) error {
}

func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}

func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
names, err := validateTopicNames(msg.Topic())
if err != nil {
c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err)
Expand All @@ -192,7 +197,7 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
return
}
}
consumer.ReconsumeLater(msg, delay)
consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, delay)
}

func (c *multiTopicConsumer) Nack(msg Message) {
Expand Down
5 changes: 5 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}

func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.")
}

// AckID the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
Expand Down
114 changes: 114 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,120 @@ func TestRLQ(t *testing.T) {
assert.Nil(t, checkMsg)
}

func TestRLQWithCustomProperties(t *testing.T) {
topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")

subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
ctx := context.Background()

client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()

// 1. Pre-produce N messages
producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
assert.Nil(t, err)
defer producer.Close()

for i := 0; i < N; i++ {
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
assert.Nil(t, err)
}

// 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{
MaxDeliveries: uint32(maxRedeliveries),
},
RetryEnable: true,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer rlqConsumer.Close()

rlqReceived := 0
for rlqReceived < N*(maxRedeliveries+1) {
msg, err := rlqConsumer.Receive(ctx)
assert.Nil(t, err)

if msg.RedeliveryCount() > 0 {
msgProps := msg.Properties()

value, ok := msgProps["custom-key-1"]
assert.True(t, ok)
if ok {
assert.Equal(t, value, "custom-value-1")
}

rlqConsumer.ReconsumeLater(msg, 1*time.Second)
} else {
customProps := map[string]string{
"custom-key-1": "custom-val-1",
}
rlqConsumer.ReconsumeLaterWithCustomProperties(msg, customProps, 1*time.Second)
}

rlqReceived++
}
fmt.Println("retry consumed:", rlqReceived) // 300

// No more messages on the Retry Topic
rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer rlqCancel()
msg, err := rlqConsumer.Receive(rlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)

// 3. Create consumer on the DLQ topic to verify the routing
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: "persistent://public/default/" + topic + "-" + subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer dlqConsumer.Close()

dlqReceived := 0
for dlqReceived < N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
}
fmt.Println("dlq received:", dlqReceived) // 100

// No more messages on the DLQ Topic
dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer dlqCancel()
msg, err = dlqConsumer.Receive(dlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)

// 4. No more messages for same subscription
checkConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer checkConsumer.Close()

checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer checkCancel()
checkMsg, err := checkConsumer.Receive(checkCtx)
assert.Error(t, err)
assert.Nil(t, checkMsg)
}

func TestAckWithResponse(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
Expand Down
4 changes: 4 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (c *mockConsumer) AckIDCumulative(msgID pulsar.MessageID) error {

func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {}

func (c *mockConsumer) ReconsumeLaterWithCustomProperties(msg pulsar.Message, customProperties map[string]string,
delay time.Duration) {
}

func (c *mockConsumer) Nack(msg pulsar.Message) {}

func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}
Expand Down

0 comments on commit 1f3747e

Please sign in to comment.