Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Checkpoint while I fix the concurrent consumption pools

  • Loading branch information...
commit b583cf8067775f4add7cb5fbb857c8c9f2569cfe 1 parent 776bfed
@deckerego deckerego authored
View
1  .gitignore
@@ -0,0 +1 @@
+/target/
View
4 catalog.xml
@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<catalog xmlns="urn:oasis:names:tc:entity:xmlns:xml:catalog" prefer="system">
+ <nextCatalog catalog="../../.netbeans/7.0/var/cache/mavencachedirs/2057847025/retriever/catalog.xml"/>
+</catalog>
View
10 pom.xml
@@ -67,6 +67,16 @@
<artifactId>slf4j-api</artifactId>
<version>1.6.3</version>
</dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.0</version>
+ </dependency>
<!-- Testing -->
<dependency>
View
46 src/main/java/amqp/spring/camel/component/SpringAMQPConsumer.java
@@ -16,7 +16,10 @@
package amqp.spring.camel.component;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -29,13 +32,14 @@
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
public class SpringAMQPConsumer extends DefaultConsumer {
- private static transient final Logger LOG = LoggerFactory.getLogger(SpringAMQPProducer.class);
+ private static transient final Logger LOG = LoggerFactory.getLogger(SpringAMQPConsumer.class);
protected SpringAMQPEndpoint endpoint;
private ThreadPoolExecutor consumers;
@@ -51,7 +55,7 @@ public SpringAMQPConsumer(SpringAMQPEndpoint endpoint, Processor processor) {
public void start() throws Exception {
super.start();
- org.springframework.amqp.core.Exchange exchange = SpringAMQPProducer.createAMQPExchange(this.endpoint);
+ org.springframework.amqp.core.Exchange exchange = this.endpoint.createAMQPExchange();
this.endpoint.amqpAdministration.declareExchange(exchange);
LOG.info("Declared exchange {}", exchange.getName());
@@ -59,7 +63,12 @@ public void start() throws Exception {
this.endpoint.getAmqpAdministration().declareQueue(queue);
LOG.info("Declared queue {}", this.queue.getName());
- this.binding = BindingBuilder.bind(this.queue).to(exchange).with(this.endpoint.routingKey).noargs();
+ if(exchange instanceof HeadersExchange) { //Is this a header exchange? Bind the key/value pair(s)
+ Map<String, Object> keyValues = parseKeyValues(this.endpoint.routingKey);
+ this.binding = BindingBuilder.bind(this.queue).to((HeadersExchange) exchange).whereAll(keyValues).match();
+ } else {
+ this.binding = BindingBuilder.bind(this.queue).to(exchange).with(this.endpoint.routingKey).noargs();
+ }
this.endpoint.getAmqpAdministration().declareBinding(binding);
LOG.info("Declared binding {}", this.binding.getRoutingKey());
@@ -84,18 +93,29 @@ public void stop() throws Exception {
this.consumers.purge();
this.consumers.getQueue().clear();
}
+
+ super.stop();
+ }
- if(this.endpoint.amqpAdministration != null && this.binding != null) {
- this.endpoint.amqpAdministration.removeBinding(this.binding);
- LOG.info("Removed binding {}", this.binding.getRoutingKey());
- }
+ @Override
+ public void shutdown() throws Exception {
+ stop();
+ super.shutdown();
+ }
+
+ protected static Map<String, Object> parseKeyValues(String routingKey) {
+ if(routingKey.contains("|"))
+ throw new IllegalArgumentException("Sorry, OR boolean not yet supported, only AND.");
- if(this.endpoint.amqpAdministration != null && this.queue != null) {
- this.endpoint.amqpAdministration.deleteQueue(this.queue.getName());
- LOG.info("Deleted queue {}", this.queue.getName());
+ StringTokenizer tokenizer = new StringTokenizer(routingKey, "&|");
+ Map<String, Object> pairs = new HashMap<String, Object>();
+ while(tokenizer.hasMoreTokens()) {
+ String token = tokenizer.nextToken();
+ String[] keyValue = token.split("=");
+ pairs.put(keyValue[0], keyValue[1]);
}
- super.stop();
+ return pairs;
}
//We have to ask the RabbitMQ Template for converters, the interface doesn't have a way to get MessageConverter
@@ -115,7 +135,7 @@ public void run() {
Message message = this.template.receive(endpoint.queueName);
if(message == null) {
- LOG.error("Received invalid message, cannot process!");
+ LOG.debug("Received null message, will not process response");
continue;
}
@@ -131,6 +151,8 @@ public void run() {
getProcessor().process(exchange);
}
+
+ LOG.info("Shutting down consumer thread for {}", endpoint.queueName);
} catch (IOException e) {
LOG.error("Error when attempting to speak with RabbitMQ", e);
} catch (InterruptedException e) {
View
25 src/main/java/amqp/spring/camel/component/SpringAMQPEndpoint.java
@@ -24,6 +24,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.FanoutExchange;
+import org.springframework.amqp.core.HeadersExchange;
+import org.springframework.amqp.core.TopicExchange;
/**
* RabbitMQ Consumer URIs are in the format of:<br/>
@@ -77,15 +81,18 @@ public SpringAMQPEndpoint(String remaining, AmqpTemplate template, AmqpAdmin adm
@Override
public Producer createProducer() throws Exception {
+ if(this.exchangeName == null)
+ throw new IllegalStateException("Cannot have null exchange name");
+
return new SpringAMQPProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
+ if(this.exchangeName == null)
+ throw new IllegalStateException("Cannot have null exchange name");
if(this.queueName == null)
throw new IllegalStateException("Cannot have null queue name for exchange "+this.exchangeName);
- if(this.routingKey == null)
- throw new IllegalStateException("Cannot have null routing key for exchange "+this.exchangeName);
return new SpringAMQPConsumer(this, processor);
}
@@ -188,4 +195,18 @@ public boolean isSingleton() {
protected String createEndpointUri() {
return "spring-amqp:"+this.exchangeName+":"+this.routingKey;
}
+
+ org.springframework.amqp.core.Exchange createAMQPExchange() {
+ if("direct".equals(this.exchangeType)) {
+ return new DirectExchange(this.exchangeName, this.durable, this.autodelete);
+ } else if("fanout".equals(this.exchangeType)) {
+ return new FanoutExchange(this.exchangeName, this.durable, this.autodelete);
+ } else if("headers".equals(this.exchangeType)) {
+ return new HeadersExchange(this.exchangeName, this.durable, this.autodelete);
+ } else if("topic".equals(this.exchangeType)) {
+ return new TopicExchange(this.exchangeName, this.durable, this.autodelete);
+ } else {
+ return new DirectExchange(this.exchangeName, this.durable, this.autodelete);
+ }
+ }
}
View
20 src/main/java/amqp/spring/camel/component/SpringAMQPProducer.java
@@ -21,12 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
-import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.FanoutExchange;
-import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
-import org.springframework.amqp.core.TopicExchange;
public class SpringAMQPProducer extends DefaultProducer {
private static transient final Logger LOG = LoggerFactory.getLogger(SpringAMQPProducer.class);
@@ -51,7 +47,7 @@ public void process(Exchange exchange) throws Exception {
public void start() throws Exception {
super.start();
- this.exchange = createAMQPExchange(this.endpoint);
+ this.exchange = this.endpoint.createAMQPExchange();
this.endpoint.amqpAdministration.declareExchange(this.exchange);
LOG.info("Declared exchange {}", this.exchange.getName());
}
@@ -66,20 +62,6 @@ public void stop() throws Exception {
super.stop();
}
- static org.springframework.amqp.core.Exchange createAMQPExchange(SpringAMQPEndpoint endpoint) {
- if("direct".equals(endpoint.exchangeType)) {
- return new DirectExchange(endpoint.exchangeName, endpoint.durable, endpoint.autodelete);
- } else if("fanout".equals(endpoint.exchangeType)) {
- return new FanoutExchange(endpoint.exchangeName, endpoint.durable, endpoint.autodelete);
- } else if("headers".equals(endpoint.exchangeType)) {
- return new HeadersExchange(endpoint.exchangeName, endpoint.durable, endpoint.autodelete);
- } else if("topic".equals(endpoint.exchangeType)) {
- return new TopicExchange(endpoint.exchangeName, endpoint.durable, endpoint.autodelete);
- } else {
- return new DirectExchange(endpoint.exchangeName, endpoint.durable, endpoint.autodelete);
- }
- }
-
private static class HeadersPostProcessor implements MessagePostProcessor {
public org.apache.camel.Message camelMessage;
View
1  src/test/java/amqp/spring/camel/component/SpringAMQPComponentTest.java
@@ -31,7 +31,6 @@ public void testCreateContext() throws Exception {
Assert.assertNotNull(component);
}
-
@Override
protected CamelContext createCamelContext() throws Exception {
ConnectionFactory factory = new CachingConnectionFactory();
View
58 src/test/java/amqp/spring/camel/component/SpringAMQPConsumerTest.java
@@ -15,6 +15,8 @@
*/
package amqp.spring.camel.component;
+import java.util.HashMap;
+import java.util.Map;
import junit.framework.Assert;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
@@ -26,10 +28,10 @@
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
//TODO Try having unit tests talk to a VM local AMQP broker (like a Qpid broker)
public class SpringAMQPConsumerTest extends CamelTestSupport {
+ private CachingConnectionFactory factory;
@Test
public void testCreateContext() throws Exception {
@@ -44,34 +46,70 @@ public void restartConsumer() throws Exception {
public void process(Exchange exchange) throws Exception { }
};
- Consumer amqpConsumer = context().getEndpoint("spring-amqp:myExchange:myQueue:test.b?durable=false&autodelete=true&exclusive=false").createConsumer(defaultProcessor);
- amqpConsumer.stop();
+ Consumer amqpConsumer = context().getEndpoint("spring-amqp:directExchange:q0:test.a?durable=false&autodelete=true&exclusive=false").createConsumer(defaultProcessor);
amqpConsumer.start();
+ amqpConsumer.stop();
}
@Test
+ public void testKeyValueParsing() throws Exception {
+ Map<String, Object> keyValues = SpringAMQPConsumer.parseKeyValues("cheese=gouda&fromage=jack");
+ Assert.assertEquals("gouda", keyValues.get("cheese"));
+ Assert.assertEquals("jack", keyValues.get("fromage"));
+ }
+
+ @Test
public void sendMessage() throws Exception {
- MockEndpoint mockEndpoint = context().getEndpoint("mock:test.b", MockEndpoint.class);
+ MockEndpoint mockEndpoint = context().getEndpoint("mock:test.a", MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
- context().createProducerTemplate().sendBody("spring-amqp:myExchange:test.b?durable=false&autodelete=true&exclusive=false", "HELLO WORLD");
+ context().createProducerTemplate().sendBody("spring-amqp:directExchange:test.a?durable=false&autodelete=true&exclusive=false", "sendMessage");
mockEndpoint.assertIsSatisfied();
}
-
+
+// @Test
+ public void testHeaderExchange() throws Exception {
+ MockEndpoint mockEndpointOne = context().getEndpoint("mock:test.b", MockEndpoint.class);
+ mockEndpointOne.expectedMessageCount(1);
+
+ Map<String, Object> headersOne = new HashMap<String, Object>();
+ headersOne.put("cheese", "asiago");
+ headersOne.put("fromage", "cheddar");
+ context().createProducerTemplate().sendBodyAndHeaders("spring-amqp:headerExchange?type=headers", "testHeaderExchange", headersOne);
+
+ MockEndpoint mockEndpointTwo = context().getEndpoint("mock:test.b", MockEndpoint.class);
+ mockEndpointTwo.expectedMessageCount(1);
+
+ Map<String, Object> headersTwo = new HashMap<String, Object>();
+ headersTwo.put("cheese", "gouda");
+ headersTwo.put("fromage", "jack");
+ context().createProducerTemplate().sendBodyAndHeaders("spring-amqp:headerExchange?type=headers", "testHeaderExchange", headersTwo);
+
+ mockEndpointOne.assertIsSatisfied();
+ mockEndpointTwo.assertIsSatisfied();
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
- ConnectionFactory factory = new CachingConnectionFactory();
-
+ this.factory = new CachingConnectionFactory();
CamelContext camelContext = super.createCamelContext();
- camelContext.addComponent("spring-amqp", new SpringAMQPComponent(factory));
+ camelContext.addComponent("spring-amqp", new SpringAMQPComponent(this.factory));
return camelContext;
}
@Override
+ protected void stopCamelContext() throws Exception {
+ super.stopCamelContext();
+ this.factory.destroy();
+ }
+
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("spring-amqp:myExchange:myQueue:test.b?durable=false&autodelete=true&exclusive=false").to("mock:test.b");
+ from("spring-amqp:directExchange:q1:test.a?durable=false&autodelete=true&exclusive=false").to("mock:test.a");
+// from("spring-amqp:headerExchange:q2:cheese=asiago&fromage=cheddar?type=headers&durable=false&autodelete=true&exclusive=false").to("mock:test.b");
+// from("spring-amqp:headerExchange:q3:cheese=gouda&fromage=jack?type=headers&durable=false&autodelete=true&exclusive=false").to("mock:test.c");
}
};
}
View
8 src/test/java/amqp/spring/camel/component/SpringAMQPProducerTest.java
@@ -36,14 +36,14 @@ public void testCreateContext() throws Exception {
@Test
public void restartProducer() throws Exception {
- Producer producer = context().getEndpoint("spring-amqp:myExchange:test.a?durable=false&autodelete=true&exclusive=false").createProducer();
- producer.stop();
+ Producer producer = context().getEndpoint("spring-amqp:myExchange:test.z?durable=false&autodelete=true&exclusive=false").createProducer();
producer.start();
+ producer.stop();
}
@Test
public void sendMessage() throws Exception {
- context().createProducerTemplate().sendBody("direct:test.a", "HELLO WORLD");
+ context().createProducerTemplate().sendBody("direct:test.z", "HELLO WORLD");
}
@Override
@@ -60,7 +60,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:test.a").to("spring-amqp:myExchange:test.a?durable=false&autodelete=true&exclusive=false");
+ from("direct:test.z").to("spring-amqp:myExchange:test.z?durable=false&autodelete=true&exclusive=false");
}
};
}
View
4 src/test/resources/amqp/spring/camel/component/SpringXMLTest-context.xml
@@ -32,11 +32,11 @@
<route>
<from uri="direct:stepOne"/>
- <to uri="spring-amqp:myExchange:test.c?type=direct&amp;durable=false&amp;autodelete=true&amp;exclusive=false"/>
+ <to uri="spring-amqp:myExchange:test.m?type=direct&amp;durable=false&amp;autodelete=true&amp;exclusive=false"/>
</route>
<route>
- <from uri="spring-amqp:myExchange:myQueue:test.c?type=direct&amp;durable=false&amp;autodelete=true&amp;exclusive=false"/>
+ <from uri="spring-amqp:myExchange:myQueue:test.m?type=direct&amp;durable=false&amp;autodelete=true&amp;exclusive=false"/>
<to uri="mock:testOne"/>
</route>
</camelContext>
Please sign in to comment.
Something went wrong with that request. Please try again.