diff --git a/dempsy-framework.api/pom.xml b/dempsy-framework.api/pom.xml index a6220d25..dc5ced0a 100644 --- a/dempsy-framework.api/pom.xml +++ b/dempsy-framework.api/pom.xml @@ -4,7 +4,7 @@ net.dempsy - dempsy-parent + dempsy-framework.parent 0.9-SNAPSHOT diff --git a/dempsy-framework.core/pom.xml b/dempsy-framework.core/pom.xml index eb71154a..fc44cea5 100644 --- a/dempsy-framework.core/pom.xml +++ b/dempsy-framework.core/pom.xml @@ -4,7 +4,7 @@ net.dempsy - dempsy-parent + dempsy-framework.parent 0.9-SNAPSHOT diff --git a/dempsy-framework.impl/pom.xml b/dempsy-framework.impl/pom.xml index 72f99e2c..0d2dfe2d 100644 --- a/dempsy-framework.impl/pom.xml +++ b/dempsy-framework.impl/pom.xml @@ -4,7 +4,7 @@ net.dempsy - dempsy-parent + dempsy-framework.parent 0.9-SNAPSHOT diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupInbound.java b/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupInbound.java index 88cd3895..cc897eee 100644 --- a/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupInbound.java +++ b/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupInbound.java @@ -34,6 +34,7 @@ public class ClusterGroupInbound { private final List inbounds = new ArrayList<>(); private boolean started = false; private GroupDetails groupDetails = null; + private Map caByCluster = null; private static final Map> current = new HashMap<>(); @@ -140,19 +141,20 @@ private synchronized void maybeStart(final Infrastructure infra, final Proxy ib) throw new IllegalStateException("The group name isn't set on the inbound for " + ib.clusterId + ". This shouldn't be possible. Was the typeId specified correctly?"); - if (groupDetails == null) + if (groupDetails == null) { groupDetails = new GroupDetails(ib.groupName, ib.address.node); - else if (!groupDetails.groupName.equals(ib.groupName)) + caByCluster = new HashMap<>(); + } else if (!groupDetails.groupName.equals(ib.groupName)) throw new IllegalStateException("The group name for " + ib.clusterId + " is " + ib.groupName + " but doesn't match prevous group names supposedly in the same group: " + groupDetails.groupName); else if (!groupDetails.node.equals(ib.address.node)) throw new IllegalStateException("The node address for " + ib.clusterId + " is " + ib.address.node + " but doesn't match prevous group names supposedly in the same group: " + groupDetails.node); - if (groupDetails.caByCluster.containsKey(ib.clusterId.clusterName)) + if (caByCluster.containsKey(ib.clusterId.clusterName)) throw new IllegalStateException("There appears to be two inbounds both configured with the same cluster id:" + ib.clusterId); - groupDetails.caByCluster.put(ib.clusterId.clusterName, ib.address); + caByCluster.put(ib.clusterId.clusterName, ib.address); if (!inbounds.contains(ib)) throw new IllegalStateException( @@ -173,6 +175,8 @@ else if (!groupDetails.node.equals(ib.address.node)) this.mask = totalShards - 1; + groupDetails.fillout(caByCluster); + utils = new Utils(infra, groupDetails.groupName, groupDetails); // subscriber first because it registers as a node. If there's no nodes // there's nothing for the leader to do. diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupRouter.java b/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupRouter.java index 59c44dd5..1e7677f3 100644 --- a/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupRouter.java +++ b/dempsy-framework.impl/src/main/java/net/dempsy/router/group/ClusterGroupRouter.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -36,6 +37,7 @@ public class ClusterGroupRouter implements Router, IntConsumer { private final ShardState state; private final Utils utils; private int mask = 0; + private int containerIndex = -1; ClusterGroupRouter(final ClusterGroupRouterFactory mom, final ClusterId clusterId, final Infrastructure infra, final String groupName) { this.mommy = mom; @@ -61,19 +63,29 @@ public ContainerAddress selectDestinationForMessage(final KeyedMessageWithType m throw new DempsyException("It appears the " + ClusterGroupRouter.class.getSimpleName() + " strategy for the message key " + SafeString.objectDescription(message != null ? message.key : null) + " is being used prior to initialization or after a failure."); + if (containerIndex < 0) { + containerIndex = getIndex(destinations, clusterName); + if (containerIndex < 0) + return null; + } final GroupDetails cur = destinations[utils.determineShard(message.key, mask)]; - return cur.caByCluster.get(clusterName); + return cur.containerAddresses[containerIndex]; } @Override public Collection allDesintations() { final GroupDetails[] cur = destinations.get(); + if (containerIndex < 0) { + containerIndex = getIndex(cur, clusterName); + if (containerIndex < 0) + return Collections.emptyList(); + } if (cur == null) return new ArrayList<>(); return new ArrayList<>(Arrays.stream(cur) .filter(gd -> gd != null) - .map(gd -> gd.caByCluster.get(clusterName)) + .map(gd -> gd.containerAddresses[containerIndex]) .filter(ca -> ca != null) .collect(Collectors.toSet())); } @@ -99,7 +111,8 @@ boolean isReady() { for (final GroupDetails d : ds) if (d == null) return false; - final boolean ret = ds.length != 0; // this method is only called in tests and this needs to be true there. + + final boolean ret = ds.length != 0 && getIndex(ds, clusterName) >= 0; // this method is only called in tests and this needs to be true there. if (ret && LOGGER.isDebugEnabled()) LOGGER.debug("at {} to {} is Ready " + shorthand(cvrt(ds, clusterName)), thisNodeId, clusterId); @@ -107,9 +120,22 @@ boolean isReady() { return ret; } + private static int getIndex(final GroupDetails[] destinations, final String clustername) { + final GroupDetails gd = Arrays.stream(destinations).filter(g -> g != null).findAny().orElse(null); + if (gd == null) + return -1; + + final Integer ret = gd.clusterIndicies.get(clustername); + return ret == null ? -1 : ret.intValue(); + } + private static ContainerAddress[] cvrt(final GroupDetails[] gds, final String clusterName) { + final int clusterIndex = getIndex(gds, clusterName); + if (clusterIndex < 0) + return new ContainerAddress[0]; + return Arrays.stream(gds) - .map(gd -> gd == null ? null : gd.caByCluster.get(clusterName)) + .map(gd -> gd == null ? null : gd.containerAddresses[clusterIndex]) .toArray(ContainerAddress[]::new); } diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/router/group/intern/GroupDetails.java b/dempsy-framework.impl/src/main/java/net/dempsy/router/group/intern/GroupDetails.java index 0240a7ee..e59ed875 100644 --- a/dempsy-framework.impl/src/main/java/net/dempsy/router/group/intern/GroupDetails.java +++ b/dempsy-framework.impl/src/main/java/net/dempsy/router/group/intern/GroupDetails.java @@ -1,8 +1,10 @@ package net.dempsy.router.group.intern; import java.io.Serializable; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.stream.IntStream; import net.dempsy.router.RoutingStrategy.ContainerAddress; import net.dempsy.transport.NodeAddress; @@ -12,26 +14,64 @@ public class GroupDetails implements Serializable { public final String groupName; public final NodeAddress node; - public final Map caByCluster; + public final Map clusterIndicies; + public ContainerAddress[] containerAddresses; @SuppressWarnings("unused") // serialization. Yay! private GroupDetails() { groupName = null; node = null; - caByCluster = null; + clusterIndicies = null; + containerAddresses = null; } public GroupDetails(final String groupName, final NodeAddress nodeAddress) { this.groupName = groupName; this.node = nodeAddress; - this.caByCluster = new HashMap<>(); + this.clusterIndicies = new HashMap<>(); + this.containerAddresses = null; + } + + public void fillout(final Map caByCluster) throws IllegalStateException { + final int size = caByCluster.size(); + this.containerAddresses = new ContainerAddress[size]; + caByCluster.entrySet().forEach(e -> { + final ContainerAddress ca = e.getValue(); + final int[] indicies = ca.clusters; + for (final int index : indicies) { + if (containerAddresses[index] != null) + throw new IllegalStateException( + "Two different clusters have the same container index (" + index + "). One is " + e.getKey() + "."); + containerAddresses[index] = ca; + } + }); + + for (int i = 0; i < containerAddresses.length; i++) { + if (containerAddresses[i] == null) + throw new IllegalStateException("Missing container address at index " + i); + } + + // now set clusterIndicies + caByCluster.entrySet().forEach(e -> { + final String cn = e.getKey(); + final ContainerAddress ca = e.getValue(); + IntStream.of(ca.clusters).forEach(i -> { + final Integer index = Integer.valueOf(i); + if (clusterIndicies.containsKey(cn)) { + if (!clusterIndicies.get(cn).equals(index)) + throw new IllegalStateException("cluster " + cn + " seems to corespond to multiple clusters in the group including " + + clusterIndicies.get(cn) + " and " + i); + } else + clusterIndicies.put(cn, index); + }); + }); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((caByCluster == null) ? 0 : caByCluster.hashCode()); + result = prime * result + Arrays.hashCode(containerAddresses); result = prime * result + ((groupName == null) ? 0 : groupName.hashCode()); result = prime * result + ((node == null) ? 0 : node.hashCode()); return result; @@ -46,10 +86,7 @@ public boolean equals(final Object obj) { if (getClass() != obj.getClass()) return false; final GroupDetails other = (GroupDetails) obj; - if (caByCluster == null) { - if (other.caByCluster != null) - return false; - } else if (!caByCluster.equals(other.caByCluster)) + if (!Arrays.equals(containerAddresses, other.containerAddresses)) return false; if (groupName == null) { if (other.groupName != null) @@ -63,4 +100,5 @@ public boolean equals(final Object obj) { return false; return true; } + } \ No newline at end of file diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/nio/NioSender.java b/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/nio/NioSender.java index 8b68769e..8e48f2ee 100644 --- a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/nio/NioSender.java +++ b/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/nio/NioSender.java @@ -20,7 +20,6 @@ import net.dempsy.transport.tcp.TcpAddress; import net.dempsy.transport.tcp.nio.internal.NioUtils; -// TODO: blocking that creates back-pressure public final class NioSender implements Sender { private final static Logger LOGGER = LoggerFactory.getLogger(NioSender.class); diff --git a/dempsy-framework.impl/src/test/java/net/dempsy/DempsyBaseTest.java b/dempsy-framework.impl/src/test/java/net/dempsy/DempsyBaseTest.java index 94f416dc..835b33c2 100644 --- a/dempsy-framework.impl/src/test/java/net/dempsy/DempsyBaseTest.java +++ b/dempsy-framework.impl/src/test/java/net/dempsy/DempsyBaseTest.java @@ -36,8 +36,16 @@ public abstract class DempsyBaseTest { * Setting 'hardcore' to true causes EVERY SINGLE IMPLEMENTATION COMBINATION to be used in * every runCombos call. This can make tests run for a loooooong time. */ - public static boolean hardcore = false; - public static boolean butRotateSerializer = true; + public static boolean hardcore = Boolean.parseBoolean(System.getProperty("hardcore", "false")); + + /** + * If this is set to true then the serializers will be rotated through but + * not every combination of serializer with the other possibilities will be tried. This is + * because the serializer testing is totally orthogonal to everything else and so setting + * this to false isn't likely to provide any better results than running the test multiple + * times. + */ + public static boolean butRotateSerializer = Boolean.parseBoolean(System.getProperty("butRotateSerializer", "true")); protected Logger LOGGER; @@ -47,19 +55,24 @@ public abstract class DempsyBaseTest { *
    *
  • routing-strategy - set the to the routing strategy id.
  • *
  • container-type - set to the container type id
  • + *
  • routing-group
  • - can be set for group strategies. Shouldn't be set otherwise. Defaults to "group." + *
  • min_nodes
  • - for managed and group strategies this sets the min_nodes to expect. Defaults to "1." + *
  • total_shards
  • - for managed and group strategies this sets the total_shards. It must be a power of "2." + * It's currently defaults to {@link DempsyBaseTest#NUM_MICROSHARDS} in the {@link DempsyBaseTest#runCombos} methods. *
* *

It autowires all of the {@link Cluster}'s that appear in the same context.

* *

Currently it directly uses the {@code BasicStatsCollector}

*/ - public static String nodeCtx = "classpath:/td/node.xml"; + protected static String nodeCtx = "classpath:/td/node.xml"; protected final String routerId; protected final String containerId; protected final String sessionType; protected final String transportType; protected final String serializerType; + protected final String threadingModelDescription; protected final Function threadingModelSource; protected static final String ROUTER_ID_PREFIX = "net.dempsy.router."; @@ -73,7 +86,8 @@ public abstract class DempsyBaseTest { protected static final int NUM_MICROSHARDS = 16; protected DempsyBaseTest(final Logger logger, final String routerId, final String containerId, final String sessionType, - final String transportType, final String serializerType, final Function threadingModelSource) { + final String transportType, final String serializerType, final String threadingModelDescription, + final Function threadingModelSource) { this.LOGGER = logger; this.routerId = routerId; this.containerId = containerId; @@ -81,6 +95,7 @@ protected DempsyBaseTest(final Logger logger, final String routerId, final Strin this.transportType = transportType; this.serializerType = serializerType; this.threadingModelSource = threadingModelSource; + this.threadingModelDescription = threadingModelDescription; } private static class Combos { @@ -102,9 +117,9 @@ public Combos(final String[] routers, final String[] containers, final String[] } } - public static List elasticRouterIds = Arrays.asList("managed"); - public static String[] transportsThatRequireSerializer = { "nio" }; - public static String[] grouRoutingStrategies = { "group" }; + private static List elasticRouterIds = Arrays.asList("managed", "group"); + private static List transportsThatRequireSerializer = Arrays.asList("nio"); + private static List grouRoutingStrategies = Arrays.asList("group"); public static Object[][] threadingModelDetails = { { "blocking-limited", (Function) (testName) -> new DefaultThreadingModel(testName) @@ -168,11 +183,15 @@ public static Combos production() { } public static boolean requiresSerialization(final String transport) { - return Arrays.asList(transportsThatRequireSerializer).contains(transport); + return transportsThatRequireSerializer.contains(transport); } public static boolean isGroupRoutingStrategy(final String routerId) { - return Arrays.asList(grouRoutingStrategies).contains(routerId); + return grouRoutingStrategies.contains(routerId); + } + + public static boolean isElasticRoutingStrategy(final String routerId) { + return elasticRouterIds.contains(routerId); } @Parameters(name = "{index}: routerId={0}, container={1}, cluster={2}, threading={5}, transport={3}/{4}") @@ -200,7 +219,7 @@ public static Collection combos() { ret.add(new Object[] { router, container, sessFact, tp, ser, threading[0], threading[1] }); } } else - ret.add(new Object[] { router, container, sessFact, tp, "none" }); + ret.add(new Object[] { router, container, sessFact, tp, "none", threading[0], threading[1] }); } } } @@ -246,10 +265,15 @@ public boolean filter(final String routerId, final String containerId, final Str final String serializerType); } - private static final String[] frameworkCtx = { "classpath:/td/node.xml" }; + private static final String[] frameworkCtx = { nodeCtx }; - ServiceTracker currentlyTracking = null; - ClusterInfoSessionFactory currentSessionFactory = null; + // ============================================================================== + // Test state information accessible to the subclasses. + // ============================================================================== + protected ServiceTracker currentlyTracking = null; + protected ClusterInfoSessionFactory currentSessionFactory = null; + protected String currentAppName = null; + // ============================================================================== protected void stopSystem() throws Exception { currentSessionFactory = null; @@ -265,7 +289,6 @@ protected void runCombos(final String testName, final String[][] ctxs, final Tes } private static AtomicLong runComboSequence = new AtomicLong(0); - protected static String currentAppName = null; protected void runCombos(final String testName, final ComboFilter filter, final String[][] ctxs, final TestToRun test) throws Exception { runCombos(testName, filter, ctxs, null, test); @@ -276,7 +299,12 @@ protected void runCombos(final String testName, final ComboFilter filter, final if (filter != null && !filter.filter(routerId, containerId, sessionType, transportType, serializerType)) return; - currentAppName = testName + "-" + runComboSequence.getAndIncrement(); + final long comboSequence = runComboSequence.getAndIncrement(); + currentAppName = testName + "-" + comboSequence; + + LOGGER.info("====================================================================================="); + LOGGER.info("======== Running (" + comboSequence + ") " + testName + " with " + routerId + ", " + containerId + ", " + sessionType + ", " + + threadingModelDescription + ", " + transportType + "/" + serializerType); try (final ServiceTracker tr = new ServiceTracker()) { currentlyTracking = tr; @@ -284,7 +312,7 @@ protected void runCombos(final String testName, final ComboFilter filter, final .set("routing-strategy", ROUTER_ID_PREFIX + routerId) .set("container-type", CONTAINER_ID_PREFIX + containerId) .set("test-name", currentAppName) - .set("total_shards", Integer.toString(NUM_MICROSHARDS)); + .setIfAbsent("total_shards", Integer.toString(NUM_MICROSHARDS)); // instantiate session factory final ClusterInfoSessionFactory sessFact = tr diff --git a/dempsy-framework.impl/src/test/java/net/dempsy/TestElasticity.java b/dempsy-framework.impl/src/test/java/net/dempsy/TestElasticity.java index fcf6836f..394143d0 100644 --- a/dempsy-framework.impl/src/test/java/net/dempsy/TestElasticity.java +++ b/dempsy-framework.impl/src/test/java/net/dempsy/TestElasticity.java @@ -58,7 +58,7 @@ public class TestElasticity extends DempsyBaseTest { public TestElasticity(final String routerId, final String containerId, final String sessCtx, final String tpCtx, final String serType, final String threadingModelDescription, final Function threadingModelSource) { - super(LOGGER, routerId, containerId, sessCtx, tpCtx, serType, threadingModelSource); + super(LOGGER, routerId, containerId, sessCtx, tpCtx, serType, threadingModelDescription, threadingModelSource); } // ======================================================================== @@ -278,14 +278,7 @@ public void testForProfiler() throws Throwable { final KeyExtractor ke = new KeyExtractor(); - runCombos("testForProfiler", (r, c, s, t, ser) -> { - final boolean ret = elasticRouterIds.contains(r); - if (ret) { - LOGGER.info("====================================================================================="); - LOGGER.info("======== Running testForProfiler with " + r + ", " + c + ", " + s + ", " + t + "/" + ser); - } - return ret; - }, actxPath, new String[][][] { + runCombos("testForProfiler", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), actxPath, new String[][][] { null, { { "min_nodes", "3" } }, { { "min_nodes", "3" } }, @@ -353,8 +346,6 @@ public void testForProfiler() throws Throwable { assertEquals(profilerTestNumberCount, count.get()); - } finally { - LOGGER.info("====================================================================================="); } }); } catch (final Throwable th) { @@ -380,7 +371,7 @@ private static void runACycle(final AtomicBoolean keepGoing, final int rankIndex @Test public void testNumberCountDropOneAndReAdd() throws Throwable { - runCombos("testNumberCountDropOneAndReAdd", (r, c, s, t, ser) -> elasticRouterIds.contains(r), actxPath, ns -> { + runCombos("testNumberCountDropOneAndReAdd", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), actxPath, ns -> { // keepGoing is for the separate thread that pumps messages into the system. final AtomicBoolean keepGoing = new AtomicBoolean(true); try { @@ -444,7 +435,7 @@ public void testNumberCountDropOneAndReAdd() throws Throwable { @Test public void testNumberCountAddOneThenDrop() throws Throwable { - runCombos("testNumberCountAddOneThenDrop", (r, c, s, t, ser) -> elasticRouterIds.contains(r), actxPath, ns -> { + runCombos("testNumberCountAddOneThenDrop", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), actxPath, ns -> { // keepGoing is for the separate thread that pumps messages into the system. final AtomicBoolean keepGoing = new AtomicBoolean(true); try { @@ -514,7 +505,7 @@ public void testExpansionPassivation() throws Exception { { "elasticity/mp-num-rank.xml", }, }; - runCombos("testExpansionPassivation", (r, c, s, t, ser) -> elasticRouterIds.contains(r), actxPath, ns -> { + runCombos("testExpansionPassivation", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), actxPath, ns -> { final AtomicBoolean keepGoing = new AtomicBoolean(true); try { final List nodes = new ArrayList<>(ns.nodes); diff --git a/dempsy-framework.impl/src/test/java/net/dempsy/TestWordCount.java b/dempsy-framework.impl/src/test/java/net/dempsy/TestWordCount.java index 5d77c89c..57769520 100644 --- a/dempsy-framework.impl/src/test/java/net/dempsy/TestWordCount.java +++ b/dempsy-framework.impl/src/test/java/net/dempsy/TestWordCount.java @@ -2,7 +2,7 @@ import static net.dempsy.utils.test.ConditionPoll.poll; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.BufferedInputStream; @@ -63,7 +63,7 @@ public static String readBible() throws IOException { public TestWordCount(final String routerId, final String containerId, final String sessCtx, final String tpid, final String serType, final String threadingModelDescription, final Function threadingModelSource) { - super(LOGGER, routerId, containerId, sessCtx, tpid, serType, threadingModelSource); + super(LOGGER, routerId, containerId, sessCtx, tpid, serType, threadingModelDescription, threadingModelSource); } @Before @@ -394,7 +394,7 @@ public void testWordCountNoRankMultinode() throws Throwable { }; WordProducer.latch = new CountDownLatch(1); // need to make it wait. - runCombos("testWordCountNoRankMultinode", (r, c, s, t, ser) -> !r.equals("simple"), ctxs, n -> { + runCombos("testWordCountNoRankMultinode", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), ctxs, n -> { final List nodes = n.nodes; final NodeManager[] manager = Arrays.asList(nodes.get(0).manager, nodes.get(1).manager).toArray(new NodeManager[2]); final ClassPathXmlApplicationContext[] ctx = Arrays.asList(nodes.get(0).ctx, nodes.get(1).ctx) @@ -439,7 +439,7 @@ public void testWordCountNoRankAdaptorOnlyNode() throws Throwable { final int NUM_WC = ctxs.length - 1; // the adaptor is the first one. WordProducer.latch = new CountDownLatch(1); // need to make it wait. - runCombos("testWordCountNoRankMultinode", (r, c, s, t, ser) -> !r.equals("simple"), ctxs, n -> { + runCombos("testWordCountNoRankMultinode", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), ctxs, n -> { final List nodes = n.nodes; final NodeManager[] managers = nodes.stream().map(nm -> nm.manager).toArray(NodeManager[]::new); @@ -474,9 +474,6 @@ public void testWordCountHomogeneousProcessing() throws Throwable { { "classpath:/word-count/mp-word-count.xml", "classpath:/word-count/mp-word-rank.xml" }, { "classpath:/word-count/mp-word-count.xml", "classpath:/word-count/mp-word-rank.xml" }, { "classpath:/word-count/mp-word-count.xml", "classpath:/word-count/mp-word-rank.xml" }, - { "classpath:/word-count/mp-word-count.xml", "classpath:/word-count/mp-word-rank.xml" }, - { "classpath:/word-count/mp-word-count.xml", "classpath:/word-count/mp-word-rank.xml" }, - { "classpath:/word-count/mp-word-count.xml", "classpath:/word-count/mp-word-rank.xml" }, }; final int NUM_WC = ctxs.length - 1; // the adaptor is the first one. @@ -486,10 +483,11 @@ public void testWordCountHomogeneousProcessing() throws Throwable { .set("min_nodes", Integer.toString(NUM_WC)) .set("routing-group", ":group") .set("send_threads", "1") - .set("receive_threads", "1")) { + .set("receive_threads", "1") + .set("blocking-queue-size", "500000")) { WordProducer.latch = new CountDownLatch(1); // need to make it wait. - runCombos("testWordCountHomogeneousProcessing", ctxs, n -> { + runCombos("testWordCountHomogeneousProcessing", (r, c, s, t, ser) -> isElasticRoutingStrategy(r), ctxs, n -> { final List nodes = n.nodes; final NodeManager[] managers = nodes.stream().map(nm -> nm.manager).toArray(NodeManager[]::new); @@ -528,19 +526,14 @@ public void testWordCountHomogeneousProcessing() throws Throwable { .collect(Collectors.toList()); // if the routing id isn't a group id then there should be cross talk. + assertEquals(NUM_WC, nodeStats.size()); + for (final NodeMetricGetters mg : nodeStats) + assertEquals(0, mg.getMessagesNotSentCount()); if (isGroupRoutingStrategy(routerId)) { - assertEquals(NUM_WC, nodeStats.size()); - for (final NodeMetricGetters mg : nodeStats) { + for (final NodeMetricGetters mg : nodeStats) assertEquals(0, mg.getMessagesSentCount()); - assertEquals(0, mg.getMessagesNotSentCount()); - } } else { - assertEquals(NUM_WC, nodeStats.size()); - for (final NodeMetricGetters mg : nodeStats) { - assertNotEquals(0, mg.getMessagesSentCount()); - assertEquals(0, mg.getMessagesNotSentCount()); - } - + assertNotNull(nodeStats.stream().filter(mg -> mg.getMessagesSentCount() > 0).findFirst().orElse(null)); } }); } diff --git a/dempsy-framework.impl/src/test/java/net/dempsy/router/group/TestGroupRoutingStrategy.java b/dempsy-framework.impl/src/test/java/net/dempsy/router/group/TestGroupRoutingStrategy.java index ac1be576..17dc106b 100644 --- a/dempsy-framework.impl/src/test/java/net/dempsy/router/group/TestGroupRoutingStrategy.java +++ b/dempsy-framework.impl/src/test/java/net/dempsy/router/group/TestGroupRoutingStrategy.java @@ -196,7 +196,9 @@ public void testInboundWithOutbound() throws Exception { final ContainerAddress oca = new ContainerAddress(na, 0); final Infrastructure infra = makeInfra(session, sched); final GroupDetails ogd = new GroupDetails(groupName, na); - ogd.caByCluster.put(cid.clusterName, oca); + final Map ocaiByCluster = new HashMap<>(); + ocaiByCluster.put(cid.clusterName, oca); + ogd.fillout(ocaiByCluster); final Utils msutils = new Utils<>(infra, groupName, ogd); ib.setContainerDetails(cid, oca, (l, m) -> {}); @@ -231,9 +233,8 @@ public void testInboundWithOutbound() throws Exception { // NO: destination will not necessarily clear. // poll(o -> ob.selectDestinationForMessage(km) == null); - final NodeAddress nna = new DummyNodeAddress("here-again"); - final ContainerAddress nca = new ContainerAddress(nna, 0); - ogd.caByCluster.put(cid.clusterName, oca); + final ContainerAddress nca = new ContainerAddress(new DummyNodeAddress("here-again"), 0); + ogd.fillout(ocaiByCluster); try (ClusterInfoSession ses3 = sessFact.createSession(); RoutingStrategy.Inbound ib2 = new RoutingInboundManager() diff --git a/dempsy-framework.impl/src/test/resources/td/node.xml b/dempsy-framework.impl/src/test/resources/td/node.xml index 68540463..f68dd4c9 100644 --- a/dempsy-framework.impl/src/test/resources/td/node.xml +++ b/dempsy-framework.impl/src/test/resources/td/node.xml @@ -20,7 +20,11 @@ - + + + + + diff --git a/dempsy-framework.impl/src/test/resources/td/transport-bq.xml b/dempsy-framework.impl/src/test/resources/td/transport-bq.xml index 0102893a..20d1fa1f 100644 --- a/dempsy-framework.impl/src/test/resources/td/transport-bq.xml +++ b/dempsy-framework.impl/src/test/resources/td/transport-bq.xml @@ -9,7 +9,7 @@ - + diff --git a/pom.xml b/pom.xml index b01e3b34..fa5a23d4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 net.dempsy - dempsy-parent + dempsy-framework.parent 0.9-SNAPSHOT pom Distributed Elastic Message Processing - Master Build