Permalink
Browse files

Merge pull request #1 from dterror/master

Support default exchange on QueueDeclaration
  • Loading branch information...
2 parents 5a79ea1 + 4adacf4 commit a00f2f426d49329f2044debde496f7d8cb3ac258 @dkincaid committed Nov 10, 2012
View
10 pom.xml
@@ -28,6 +28,16 @@
<version>1.2.16</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -46,8 +46,9 @@ public ExclusiveQueueWithBinding(String exchange, String routingKey) {
}
/**
- * Verifies the exchange exists, creates an exclusive, server-named queue
- * and binds it to the exchange.
+ * Creates an exclusive, server-named queue. Declares and binds
+ * the queue to the specified exchange unless it's the default exchange
+ * (which doesn't need declaring nor binding)
*
* @return the server's response to the successful queue declaration (you
* can use this to discover the name of the queue).
@@ -57,11 +58,12 @@ public ExclusiveQueueWithBinding(String exchange, String routingKey) {
*/
@Override
public Queue.DeclareOk declare(Channel channel) throws IOException {
- channel.exchangeDeclarePassive(exchange);
-
final Queue.DeclareOk queue = channel.queueDeclare();
- channel.queueBind(queue.getQueue(), exchange, routingKey);
+ if (!exchange.isEmpty()) {
+ channel.exchangeDeclarePassive(exchange);
+ channel.queueBind(queue.getQueue(), exchange, routingKey);
+ }
return queue;
}
@@ -62,8 +62,9 @@ public SharedQueueWithBinding(String queueName, String exchange, String routingK
}
/**
- * Verifies the exchange exists, creates the named queue if it does not
- * exist, and binds it to the exchange.
+ * Creates the named queue if it does not exist. Declares and binds
+ * the queue to the specified exchange unless it's the default exchange
+ * (which doesn't need declaring nor binding)
*
* @return the server's response to the successful queue declaration.
*
@@ -72,16 +73,17 @@ public SharedQueueWithBinding(String queueName, String exchange, String routingK
*/
@Override
public Queue.DeclareOk declare(Channel channel) throws IOException {
- channel.exchangeDeclarePassive(exchange);
-
final Queue.DeclareOk queue = channel.queueDeclare(
queueName,
/* durable */ true,
/* non-exclusive */ false,
/* non-auto-delete */ false,
haPolicy == null ? null /* no arguments */ : haPolicy.asQueueProperies());
- channel.queueBind(queue.getQueue(), exchange, routingKey);
+ if (!exchange.isEmpty()) {
+ channel.exchangeDeclarePassive(exchange);
+ channel.queueBind(queue.getQueue(), exchange, routingKey);
+ }
return queue;
}
@@ -0,0 +1,55 @@
+package com.rapportive.storm.amqp;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.rabbitmq.client.AMQP.Queue;
+import com.rabbitmq.client.Channel;
+
+public class ExclusiveQueueWithBindingTest {
+ private Channel channelMock;
+ private Queue.DeclareOk queueOkMock;
+
+ @Before
+ public void setUp() throws Throwable {
+ channelMock = mock(Channel.class);
+ queueOkMock = mock(Queue.DeclareOk.class);
+ when(channelMock.queueDeclare()).thenReturn(queueOkMock);
+ when(queueOkMock.getQueue()).thenReturn("sample_queue");
+ }
+
+ @Test
+ public void declare_WithSampleExchange_DeclaresExchange() throws Throwable {
+ ExclusiveQueueWithBinding qBinding = new ExclusiveQueueWithBinding("sample_exchange", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock).exchangeDeclarePassive("sample_exchange");
+ }
+
+ @Test
+ public void declare_WithDefaultExchange_DoesntDeclareExchange() throws Throwable {
+ ExclusiveQueueWithBinding qBinding = new ExclusiveQueueWithBinding("", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock, never()).exchangeDeclarePassive("");
+ }
+
+ @Test
+ public void declare_WithSampleExchange_BindsQueueToExchange() throws Throwable {
+ ExclusiveQueueWithBinding qBinding = new ExclusiveQueueWithBinding("sample_exchange", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock).queueBind("sample_queue", "sample_exchange", "#");
+ }
+
+ @Test
+ public void declare_WithDefaultExchange_DoesntBindQueueToExchange() throws Throwable {
+ ExclusiveQueueWithBinding qBinding = new ExclusiveQueueWithBinding("", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock, never()).queueBind("sample_queue", "", "#");
+ }
+
+}
@@ -0,0 +1,55 @@
+package com.rapportive.storm.amqp;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.AMQP.Queue;
+
+public class SharedQueueWithBindingTest {
+ private Channel channelMock;
+ private Queue.DeclareOk queueOkMock;
+
+ @Before
+ public void setUp() throws Throwable {
+ channelMock = mock(Channel.class);
+ queueOkMock = mock(Queue.DeclareOk.class);
+ when(channelMock.queueDeclare("queue", true, false, false, null)).thenReturn(queueOkMock);
+ when(queueOkMock.getQueue()).thenReturn("queue");
+ }
+
+ @Test
+ public void declare_WithSampleExchange_DeclaresExchange() throws Throwable {
+ SharedQueueWithBinding qBinding = new SharedQueueWithBinding("queue", "sample_exchange", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock).exchangeDeclarePassive("sample_exchange");
+ }
+
+ @Test
+ public void declare_WithDefaultExchange_DoesntDeclareExchange() throws Throwable {
+ SharedQueueWithBinding qBinding = new SharedQueueWithBinding("queue", "", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock, never()).exchangeDeclarePassive("");
+ }
+
+ @Test
+ public void declare_WithSampleExchange_BindsQueueToExchange() throws Throwable {
+ SharedQueueWithBinding qBinding = new SharedQueueWithBinding("queue", "sample_exchange", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock).queueBind("queue", "sample_exchange", "#");
+ }
+
+ @Test
+ public void declare_WithDefaultExchange_DoesntBindQueueToExchange() throws Throwable {
+ SharedQueueWithBinding qBinding = new SharedQueueWithBinding("queue", "", "#");
+
+ qBinding.declare(channelMock);
+ verify(channelMock, never()).queueBind("queue", "", "#");
+ }
+
+}

0 comments on commit a00f2f4

Please sign in to comment.