Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow CustomProperties when sending message for retry #916

Merged
merged 3 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ type Consumer interface {
// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)

// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, delay time.Duration)

// Nack acknowledges the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
Expand Down
10 changes: 10 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ func (c *consumer) AckID(msgID MessageID) error {

// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}

// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
if delay < 0 {
delay = 0
}
Expand All @@ -510,6 +516,10 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
props[k] = v
}

for k, v := range customProperties {
props[k] = v
}

reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
Expand Down
7 changes: 6 additions & 1 deletion pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
}

func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}

func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
names, err := validateTopicNames(msg.Topic())
if err != nil {
c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err)
Expand All @@ -165,7 +170,7 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
return
}
}
consumer.ReconsumeLater(msg, delay)
consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, delay)
}

func (c *multiTopicConsumer) Nack(msg Message) {
Expand Down
5 changes: 5 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}

func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.")
}

// AckID the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
Expand Down
4 changes: 4 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (c *mockConsumer) AckID(msgID pulsar.MessageID) error {

func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {}

func (c *mockConsumer) ReconsumeLaterWithCustomProperties(msg pulsar.Message, customProperties map[string]string,
delay time.Duration) {
}

func (c *mockConsumer) Nack(msg pulsar.Message) {}

func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}
Expand Down