From 2198ab8c1130133f503273860b8a97cceb7a8084 Mon Sep 17 00:00:00 2001 From: Roddie Kieley Date: Wed, 31 Oct 2018 15:04:07 -0230 Subject: [PATCH] ARTEMIS-2160: Addressed occurance where cluster configuration on server locator was hard coded. Covered with test. --- .../server/cluster/ClusterController.java | 11 ++- .../cluster/ClusterControllerTest.java | 85 ++++++++++++++++++- .../cluster/distribution/ClusterTestBase.java | 74 +++++++++++----- 3 files changed, 146 insertions(+), 24 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index 03eb2432ac2..15cf04efcb5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -185,8 +185,11 @@ private void configAndAdd(SimpleString name, serverLocator.setConnectionTTL(config.getConnectionTTL()); serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); //if the cluster isn't available we want to hang around until it is - serverLocator.setReconnectAttempts(-1); - serverLocator.setInitialConnectAttempts(-1); + serverLocator.setReconnectAttempts(config.getReconnectAttempts()); + serverLocator.setInitialConnectAttempts(config.getInitialConnectAttempts()); + serverLocator.setRetryInterval(config.getRetryInterval()); + serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier()); + serverLocator.setMaxRetryInterval(config.getMaxRetryInterval()); //this is used for replication so need to use the server packet decoder serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); @@ -438,4 +441,8 @@ public ServerLocator getReplicationLocator() { return this.replicationLocator; } + public ServerLocator getServerLocator(SimpleString name) { + return locators.get(name); + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java index 91857ef0954..f7cbd6200ee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java @@ -16,12 +16,19 @@ */ package org.apache.activemq.artemis.tests.integration.cluster; +import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterControl; import org.apache.activemq.artemis.core.server.cluster.ClusterController; +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.junit.Before; @@ -29,6 +36,9 @@ public class ClusterControllerTest extends ClusterTestBase { + private ClusterConnectionConfiguration clusterConf0; + private ClusterConnectionConfiguration clusterConf1; + @Override @Before public void setUp() throws Exception { @@ -45,13 +55,74 @@ public void setUp() throws Exception { getServer(1).getConfiguration().setClusterPassword("something different"); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1); + clusterConf0 = new ClusterConnectionConfiguration() + .setName("cluster0") + .setAddress("queues") + .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND) + .setMaxHops(1) + .setInitialConnectAttempts(8) + .setReconnectAttempts(10) + .setRetryInterval(250) + .setMaxRetryInterval(4000) + .setRetryIntervalMultiplier(2.0); + + clusterConf1 = new ClusterConnectionConfiguration() + .setName("cluster0") + .setAddress("queues") + .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND) + .setMaxHops(1) + .setInitialConnectAttempts(8) + .setReconnectAttempts(10) + .setRetryInterval(250) + .setMaxRetryInterval(4000) + .setRetryIntervalMultiplier(2.0); + + setupClusterConnection(clusterConf0, true, 0); + setupClusterConnection(clusterConf1, true, 1); startServers(0); startServers(1); } + private boolean clusterConnectionConfigurationIsSameBeforeAfterStart(ClusterConnectionConfiguration clusterConnectionConfigurationBeforeStart, int node) { + boolean clusterConnectionConfigurationIsSame = false; + + Configuration serverNodeConfiguration = getServer(node).getConfiguration(); + ActiveMQServer serverNode = getServer(node); + ClusterManager clusterManager = serverNode.getClusterManager(); + ClusterController clusterController = clusterManager.getClusterController(); + ServerLocator serverNodeLocator = clusterController.getServerLocator(new SimpleString(clusterConnectionConfigurationBeforeStart.getName())); + List serverNodeClusterConnectionConfigurations = serverNodeConfiguration.getClusterConfigurations(); + + do { + if (serverNodeLocator.getInitialConnectAttempts() != clusterConnectionConfigurationBeforeStart.getInitialConnectAttempts()) { + break; + } + + if (serverNodeLocator.getReconnectAttempts() != clusterConnectionConfigurationBeforeStart.getReconnectAttempts()) { + break; + } + + if (serverNodeLocator.getRetryInterval() != clusterConnectionConfigurationBeforeStart.getRetryInterval()) { + break; + } + if (serverNodeLocator.getMaxRetryInterval() != clusterConnectionConfigurationBeforeStart.getMaxRetryInterval()) { + break; + } + + Double serverNodeClusterConnectionConfigurationRIM = serverNodeLocator.getRetryIntervalMultiplier(); + Double clusterConnectionConfigurationBeforeStartRIM = clusterConnectionConfigurationBeforeStart.getRetryIntervalMultiplier(); + if (0 != serverNodeClusterConnectionConfigurationRIM.compareTo(clusterConnectionConfigurationBeforeStartRIM)) { + break; + } + + clusterConnectionConfigurationIsSame = true; + } + while (false); + + return clusterConnectionConfigurationIsSame; + } + @Test public void controlWithDifferentConnector() throws Exception { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { @@ -76,4 +147,14 @@ public void controlWithDifferentPassword() throws Exception { } } } + + @Test + public void verifyServerLocatorsClusterConfiguration() { + if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf0, 0)) { + fail("serverLocator is not configured as per clusterConf0"); + } + if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf1, 1)) { + fail("serverLocator is not configured as per clusterConf1"); + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 6e7f9b338ab..0e4c9603977 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1755,6 +1755,36 @@ protected void setupClusterConnection(final String name, setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo); } + private List getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) { + List pairs = new ArrayList<>(); + for (int element : nodesTo) { + TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); + pairs.add(serverTotc.getName()); + } + return pairs; + } + + protected void setupClusterConnection(ClusterConnectionConfiguration clusterConf, + final boolean netty, + final int nodeFrom, + final int... nodesTo) { + ActiveMQServer serverFrom = servers[nodeFrom]; + + if (serverFrom == null) { + throw new IllegalStateException("No server at node " + nodeFrom); + } + + TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); + + List pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); + Configuration config = serverFrom.getConfiguration(); + clusterConf.setConnectorName(connectorFrom.getName()).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + + config.getClusterConfigurations().add(clusterConf); + } + protected void setupClusterConnection(final String name, final String address, final MessageLoadBalancingType messageLoadBalancingType, @@ -1772,12 +1802,7 @@ protected void setupClusterConnection(final String name, TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); - List pairs = new ArrayList<>(); - for (int element : nodesTo) { - TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); - pairs.add(serverTotc.getName()); - } + List pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs); @@ -1805,15 +1830,21 @@ protected void setupClusterConnection(final String name, TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); - List pairs = new ArrayList<>(); - for (int element : nodesTo) { - TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); - pairs.add(serverTotc.getName()); - } + List pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setCallTimeout(100).setCallFailoverTimeout(100).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorFrom.getName()) + .setRetryInterval(retryInterval) + .setReconnectAttempts(reconnectAttempts) + .setCallTimeout(100) + .setCallFailoverTimeout(100) + .setMessageLoadBalancingType(messageLoadBalancingType) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); config.getClusterConfigurations().add(clusterConf); } @@ -1824,7 +1855,15 @@ private ClusterConnectionConfiguration createClusterConfig(final String name, final int maxHops, TransportConfiguration connectorFrom, List pairs) { - return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + return new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorFrom.getName()) + .setRetryInterval(250) + .setMessageLoadBalancingType(messageLoadBalancingType) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); } protected void setupClusterConnectionWithBackups(final String name, @@ -1843,12 +1882,7 @@ protected void setupClusterConnectionWithBackups(final String name, TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom); - List pairs = new ArrayList<>(); - for (int element : nodesTo) { - TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); - pairs.add(serverTotc.getName()); - } + List pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);