Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Rabbitmq exclusive consumer #2403

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -174,7 +174,7 @@ public void start() throws IOException {
if (channel == null) {
throw new IOException("The RabbitMQ channel is not open");
}
tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), this);
tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), "", false, consumer.getEndpoint().isExclusiveConsumer(), null, this);
}

/**
Expand Down
Expand Up @@ -59,6 +59,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
private boolean autoDelete = true;
@Metadata(label = "common", defaultValue = "true")
private boolean durable = true;
@Metadata(label = "consumer")
private boolean exclusiveConsumer;
@Metadata(label = "common")
private boolean exclusive;
@Metadata(label = "common")
Expand Down Expand Up @@ -229,6 +231,7 @@ protected RabbitMQEndpoint createEndpoint(String uri,
endpoint.setAutoDelete(isAutoDelete());
endpoint.setDurable(isDurable());
endpoint.setExclusive(isExclusive());
endpoint.setExclusiveConsumer(isExclusiveConsumer());
endpoint.setPassive(isPassive());
endpoint.setSkipExchangeDeclare(isSkipExchangeDeclare());
endpoint.setSkipQueueBind(isSkipQueueBind());
Expand Down Expand Up @@ -742,6 +745,18 @@ public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}

public boolean isExclusiveConsumer() {
return exclusiveConsumer;
}

/**
* Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
* when you want a long-lived shared queue to be temporarily accessible by just one consumer.
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}

public boolean isPassive() {
return passive;
}
Expand Down
Expand Up @@ -76,6 +76,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private boolean autoDelete = true;
@UriParam(label = "common", defaultValue = "true")
private boolean durable = true;
@UriParam(label = "consumer", defaultValue = "false")
private boolean exclusiveConsumer;
@UriParam(label = "common")
private boolean exclusive;
@UriParam(label = "common")
Expand Down Expand Up @@ -985,6 +987,18 @@ public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}

public boolean isExclusiveConsumer() {
return exclusiveConsumer;
}

/**
* Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
* when you want a long-lived shared queue to be temporarily accessible by just one consumer.
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}

public boolean isPassive() {
return passive;
}
Expand Down
Expand Up @@ -44,6 +44,7 @@ public void testDefaultProperties() throws Exception {
assertEquals(true, endpoint.isAutoAck());
assertEquals(true, endpoint.isAutoDelete());
assertEquals(true, endpoint.isDurable());
assertEquals(false, endpoint.isExclusiveConsumer());
assertEquals("direct", endpoint.getExchangeType());
assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout());
assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax());
Expand All @@ -68,6 +69,7 @@ public void testPropertiesSet() throws Exception {
params.put("requestedChannelMax", 456);
params.put("requestedFrameMax", 789);
params.put("requestedHeartbeat", 321);
params.put("exclusiveConsumer", true);

RabbitMQEndpoint endpoint = createEndpoint(params);

Expand All @@ -86,6 +88,7 @@ public void testPropertiesSet() throws Exception {
assertEquals(456, endpoint.getRequestedChannelMax());
assertEquals(789, endpoint.getRequestedFrameMax());
assertEquals(321, endpoint.getRequestedHeartbeat());
assertEquals(true, endpoint.isExclusiveConsumer());
}

private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception {
Expand Down
Expand Up @@ -344,6 +344,12 @@ public void createEndpointWithExclusiveEnabled() throws Exception {
assertTrue(endpoint.isExclusive());
}

@Test
public void createEndpointWithExclusiveConsumerEnabled() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?exclusiveConsumer=true", RabbitMQEndpoint.class);
assertTrue(endpoint.isExclusiveConsumer());
}

@Test
public void createEndpointWithPassiveEnabled() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?passive=true", RabbitMQEndpoint.class);
Expand Down