Permalink
Browse files

Async consumption of BigOp

  • Loading branch information...
1 parent 0a01d14 commit 88f35cc8abc81722ea2949a9c677dc5158daad0a @ryanbrainard ryanbrainard committed Jun 6, 2012
View
@@ -24,6 +24,17 @@
<artifactId>spring-rabbit</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
@@ -7,18 +7,23 @@
*/
public class BigOperation implements Serializable {
- public static final String QUEUE_NAME = "myqueue";
-
- String name;
+ private String name;
- public String getName() {
- return name;
+ public BigOperation() {
+ }
+
+ public BigOperation(String name) {
+ this.name = name;
}
public void setName(String name) {
this.name = name;
}
+ public String getName() {
+ return name;
+ }
+
@Override
public String toString() {
return "BigOperation{" +
@@ -0,0 +1,62 @@
+package com.heroku.devcenter;
+
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static java.lang.System.getenv;
+
+@Configuration
+public class RabbitConfiguration {
+
+ @Bean
+ public ConnectionFactory connectionFactory() {
+ final URI rabbitMqUrl;
+ try {
+ rabbitMqUrl = new URI(getEnvOrThrow("RABBITMQ_URL"));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ final CachingConnectionFactory factory = new CachingConnectionFactory();
+ factory.setUsername(rabbitMqUrl.getUserInfo().split(":")[0]);
+ factory.setPassword(rabbitMqUrl.getUserInfo().split(":")[1]);
+ factory.setHost(rabbitMqUrl.getHost());
+ factory.setPort(rabbitMqUrl.getPort());
+ factory.setVirtualHost(rabbitMqUrl.getPath().substring(1));
+
+ return factory;
+ }
+
+ @Bean
+ public AmqpAdmin amqpAdmin() {
+ return new RabbitAdmin(connectionFactory());
+ }
+
+ @Bean
+ public RabbitTemplate rabbitTemplate() {
+ return new RabbitTemplate(connectionFactory());
+ }
+
+ @Bean
+ public Queue queue() {
+ return new Queue("testqueue");
+ }
+
+ private static String getEnvOrThrow(String name) {
+ final String env = getenv(name);
+ if (env == null) {
+ throw new IllegalStateException("Environment variable [" + name + "] is not set.");
+ }
+ return env;
+ }
+
+}
@@ -1,43 +0,0 @@
-package com.heroku.devcenter;
-
-import com.rabbitmq.client.ConnectionFactory;
-import org.springframework.amqp.core.AmqpTemplate;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.GenericXmlApplicationContext;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import static java.lang.System.getenv;
-
-public class RabbitUtils {
-
- private RabbitUtils(){}
-
- public static AmqpTemplate getTemplate() {
- ApplicationContext context = new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
- return context.getBean(AmqpTemplate.class);
- }
-
- public static ConnectionFactory getConnectionFactory() throws URISyntaxException {
- ConnectionFactory factory = new ConnectionFactory();
-
- URI uri = new URI(getEnvOrThrow("RABBITMQ_URL"));
- factory.setUsername(uri.getUserInfo().split(":")[0]);
- factory.setPassword(uri.getUserInfo().split(":")[1]);
- factory.setHost(uri.getHost());
- factory.setPort(uri.getPort());
- factory.setVirtualHost(uri.getPath().substring(1));
-
- return factory;
- }
-
- private static String getEnvOrThrow(String name) {
- final String env = getenv(name);
- if (env == null) {
- throw new IllegalStateException("Environment variable [" + name + "] is not set.");
- }
- return env;
- }
-
-}
@@ -1,17 +0,0 @@
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
-
- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="#{ T(com.heroku.devcenter.RabbitUtils).getConnectionFactory()}"/>
- </bean>
-
- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
-
- <rabbit:admin connection-factory="connectionFactory"/>
-
-</beans>
@@ -0,0 +1,85 @@
+package com.heroku.devcenter;
+
+import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageListener;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.amqp.support.converter.SimpleMessageConverter;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.util.ErrorHandler;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * @author Ryan Brainard
+ */
+public class RabbitIT {
+
+ private final ApplicationContext rabbitConfig = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
+ private final ConnectionFactory rabbitConnectionFactory = rabbitConfig.getBean(ConnectionFactory.class);
+ private final AmqpTemplate amqpTemplate = rabbitConfig.getBean(AmqpTemplate.class);
+ private final Queue rabbitQueue = rabbitConfig.getBean(Queue.class);
+
+ @BeforeTest
+ public void cleanTheRabbit() {
+ while (amqpTemplate.receive(rabbitQueue.getName()) != null){}
+ }
+
+ @Test
+ public void testSynchronous() throws Exception {
+ amqpTemplate.convertAndSend(rabbitQueue.getName(), new BigOperation("foo"));
+ Assert.assertEquals(((BigOperation) amqpTemplate.receiveAndConvert(rabbitQueue.getName())).getName(), "foo");
+ }
+
+ @Test
+ public void testAsynchronous() throws Exception {
+ final MessageConverter messageConverter = new SimpleMessageConverter();
+ final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
+ container.setConnectionFactory(rabbitConnectionFactory);
+ container.setQueueNames(rabbitQueue.getName());
+
+ final CountDownLatch fooLatch = new CountDownLatch(1);
+ final CountDownLatch barLatch = new CountDownLatch(2);
+ final List<BigOperation> receievedMessageHolder = new ArrayList<BigOperation>(2);
+ container.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ receievedMessageHolder.add((BigOperation) messageConverter.fromMessage(message));
+ fooLatch.countDown();
+ barLatch.countDown();
+ }
+ });
+ container.setErrorHandler(new ErrorHandler() {
+ public void handleError(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ try {
+ container.start();
+
+ amqpTemplate.convertAndSend(rabbitQueue.getName(), new BigOperation("foo"));
+ assertTrue(fooLatch.await(5, TimeUnit.SECONDS));
+ assertEquals(receievedMessageHolder.get(0).getName(), "foo");
+
+ amqpTemplate.convertAndSend(rabbitQueue.getName(), new BigOperation("bar"));
+ assertTrue(barLatch.await(5, TimeUnit.SECONDS));
+ assertEquals(receievedMessageHolder.get(1).getName(), "bar");
+ } finally {
+ container.shutdown();
+ }
+ }
+
+}
@@ -1,18 +1,21 @@
package com.heroku.devcenter;
import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
-import java.util.Map;
-
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
- private final AmqpTemplate rabbit = RabbitUtils.getTemplate();
+ private final ApplicationContext rabbitConfig = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
+ private final AmqpTemplate amqpTemplate = rabbitConfig.getBean(AmqpTemplate.class);
+ private final Queue rabbitQueue = rabbitConfig.getBean(Queue.class);
@ModelAttribute("bigOp")
public BigOperation newBigOp() {
@@ -25,10 +28,9 @@ public String display() {
}
@RequestMapping(method = RequestMethod.POST)
- public String process(@ModelAttribute("bigOp") BigOperation bigOp, Map<String, Object> map) {
-
- rabbit.convertAndSend(BigOperation.QUEUE_NAME, bigOp);
- System.out.println("Sent message to RabbitMQ: " + bigOp.name);
+ public String process(@ModelAttribute("bigOp") BigOperation bigOp) {
+ amqpTemplate.convertAndSend(rabbitQueue.getName(), bigOp);
+ System.out.println("Sent BigOperation to RabbitMQ on queue: " + bigOp.getName());
return "rabbitConfirmation";
}
@@ -1,22 +1,55 @@
package com.heroku.devcenter;
-import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageListener;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.amqp.support.converter.SimpleMessageConverter;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.util.ErrorHandler;
public class RabbitReceiver {
public static void main(String[] args) {
- final AmqpTemplate rabbit = RabbitUtils.getTemplate();
+ final ApplicationContext rabbitConfig = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
+ final ConnectionFactory rabbitConnectionFactory = rabbitConfig.getBean(ConnectionFactory.class);
+ final Queue rabbitQueue = rabbitConfig.getBean(Queue.class);
+ final MessageConverter messageConverter = new SimpleMessageConverter();
- while (true) {
- System.out.println("Checking for message...");
+ // create a listener container, which is required for asynchronous message consumption.
+ // AmqpTemplate cannot be used in this case
+ final SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
+ listenerContainer.setConnectionFactory(rabbitConnectionFactory);
+ listenerContainer.setQueueNames(rabbitQueue.getName());
- try {
- BigOperation bigOp = (BigOperation) rabbit.receiveAndConvert(BigOperation.QUEUE_NAME);
- System.out.println("Received Big Operation: " + bigOp);
- } catch (Exception e) {
- System.err.println("ERROR checking message");
- e.printStackTrace(System.err);
+ // set the callback for message handling
+ listenerContainer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ final BigOperation bigOp = (BigOperation) messageConverter.fromMessage(message);
+ System.out.println("Received Big Operation: " + bigOp.getName());
}
- }
+ });
+
+ // set a simple error handler
+ listenerContainer.setErrorHandler(new ErrorHandler() {
+ public void handleError(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ // register a shutdown hook with the JVM
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("Shutting down RabbitReceiver");
+ listenerContainer.shutdown();
+ }
+ });
+
+ // start up the listener. this will block until JVM is killed.
+ listenerContainer.start();
}
}

0 comments on commit 88f35cc

Please sign in to comment.