Permalink
Browse files

How to implement Scatter-Gather

  • Loading branch information...
1 parent 5232b83 commit dcd0a42290eea408a920fc76d2be7cf68d92c920 @artembilan committed Jun 12, 2012
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
+ xmlns="http://www.springframework.org/schema/integration"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
+
+ <channel id="output">
+ <queue/>
+ </channel>
+
+
+ <aggregator input-channel="gatherChannel" output-channel="output"
+ release-strategy-expression="size() == 2"
+ expression="^[payload gt 5] ?: -1"/>
+
+
+ <!--Auction scenario-->
+ <header-enricher input-channel="inputAuctionChannel" output-channel="scatterAuctionChannel">
+ <reply-channel ref="gatherChannel"/>
+ </header-enricher>
+
+ <publish-subscribe-channel id="scatterAuctionChannel" apply-sequence="true"/>
+
+ <bridge input-channel="scatterAuctionChannel" output-channel="serviceChannel"/>
+
+ <bridge input-channel="scatterAuctionChannel" output-channel="serviceChannel"/>
+
+ <bridge input-channel="scatterAuctionChannel" output-channel="serviceChannel"/>
+
+
+ <!--Distribution scenario-->
+ <header-enricher input-channel="inputDistributionChannel" output-channel="scatterDistributionChannel">
+ <reply-channel ref="gatherChannel"/>
+ </header-enricher>
+
+ <recipient-list-router input-channel="scatterDistributionChannel" apply-sequence="true">
+ <recipient channel="distribution1Channel"/>
+ <recipient channel="distribution2Channel"/>
+ <recipient channel="distribution3Channel"/>
+ </recipient-list-router>
+
+ <bridge input-channel="distribution1Channel" output-channel="serviceChannel"/>
+
+ <bridge input-channel="distribution2Channel" output-channel="serviceChannel"/>
+
+ <bridge input-channel="distribution3Channel" output-channel="serviceChannel"/>
+
+
+
+ <service-activator input-channel="serviceChannel" expression="T(java.lang.Math).random() * 10"/>
+
+
+</beans:beans>
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.springframework.integration.scattergather;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.Message;
+import org.springframework.integration.MessageChannel;
+import org.springframework.integration.core.PollableChannel;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.test.annotation.Repeat;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+/**
+ * @author Artem Bilan
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration
+public class ScatterGatherTests {
+
+ @Autowired
+ private MessageChannel inputAuctionChannel;
+
+ @Autowired
+ private MessageChannel inputDistributionChannel;
+
+ @Autowired
+ private PollableChannel output;
+
+ @Test
+ @Repeat(10)
+ public void testAuction() throws Exception {
+ Message<String> quoteMessage = MessageBuilder.withPayload("testQuote").build();
+ inputAuctionChannel.send(quoteMessage);
+ Message<?> bestQuoteMessage = output.receive();
+ System.out.println(bestQuoteMessage);
+ }
+
+ @Test
+ @Repeat(10)
+ public void testDistribution() throws Exception {
+ Message<String> quoteMessage = MessageBuilder.withPayload("testQuote").build();
+ inputDistributionChannel.send(quoteMessage);
+ Message<?> bestQuoteMessage = output.receive();
+ System.out.println(bestQuoteMessage);
+ }
+
+}

0 comments on commit dcd0a42

Please sign in to comment.