Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ single active consumer argument #3437

Merged
merged 5 commits into from
Jun 17, 2024

Conversation

DropSnorz
Copy link
Contributor

@DropSnorz DropSnorz commented Jun 6, 2024

Description

Added a new RabbitMQ subscription argument to enable Single Active Consumer (x-single-active-consumer)

Usage with declarative subscription

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: orders
spec:
  topic: orders
  routes:
    default: /orders
  pubsubname: orderpubsub
  metadata:
    queueName: "foo-queue-quorum"
    queueType: quorum
    singleActiveConsumer: "true"
scopes:
- order-processor-sdk

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #3120

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation / Created issue in the https://github.com/dapr/docs/ repo: dapr/docs#[issue number]

Signed-off-by: Arthur Poiret <dropsnorz@gmail.com>
@DropSnorz DropSnorz requested review from a team as code owners June 6, 2024 21:07
@@ -129,6 +129,11 @@ func TestPublishAndSubscribeWithPriorityQueue(t *testing.T) {
<-processed
assert.Equal(t, 4, messageCount)
assert.Equal(t, "foo bar", lastMessage)

// subscribe using single active consumer
err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataSingleActiveConsumerKey: "true"}}, handler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to actually publish a message after this, then assert that the message was received and the overall messageCount has been increased.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test function updated.

@@ -432,6 +434,18 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
args[amqp.QueueTypeArg] = amqp.QueueTypeClassic
}

// Applying x-single-active-consumer if defined at subscription level
if val := req.Metadata[reqMetadataSingleActiveConsumerKey]; val != "" {
parsedVal, pErr := strconv.ParseBool(val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use strconv.ParseBool instead use our preferred method from package dapr/kit/utils called IsTruthy https://github.com/dapr/kit/blob/106329e5839f70f3234b9f49b7dabff0247c0a90/utils/strings.go#L23

That will allow a lot more values to be used to express the "true" / on state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for highlighting this, the code is far more readable this way 👍

@berndverst
Copy link
Member

A few small changes / additions, then we can merge this.

…ument parsing

Signed-off-by: Arthur Poiret <dropsnorz@gmail.com>
@DropSnorz DropSnorz requested a review from berndverst June 6, 2024 23:18
@yaron2 yaron2 merged commit 51e0c79 into dapr:main Jun 17, 2024
86 of 91 checks passed
@yaron2
Copy link
Member

yaron2 commented Jun 17, 2024

@DropSnorz Thanks for this PR! Please open a docs issue in dapr/docs to update the RabbitMQ pub/sub spec with this setting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RabbitMQ binding enhancements required or am I missing something in configuration?
4 participants