Permalink
Browse files

Merge pull request #969 from clebertsuconic/francisco-bridge

Sending francisco's commits after a few minor changes
  • Loading branch information...
2 parents 85797e8 + 8fcc44b commit 7c558ea5b97ca54273c86b0e8435816f43d30183 @clebertsuconic clebertsuconic committed Mar 27, 2013
View
36 hornetq-server/src/main/java/org/hornetq/core/filter/impl/FilterImpl.java
@@ -20,8 +20,8 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.HornetQMessageBundle;
+import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.ServerMessage;
/**
@@ -62,7 +62,7 @@
private final SimpleString sfilterString;
- private final Map<SimpleString, Identifier> identifiers = new HashMap<SimpleString, Identifier>();
+ private final Map<SimpleString, Identifier> identifiers;
private final Object result;
@@ -89,30 +89,30 @@ public static Filter createFilter(final SimpleString filterStr) throws HornetQEx
{
return null;
}
- else
+
+ HashMap<SimpleString, Identifier> identifierMap = new HashMap<SimpleString, Identifier>();
+ Object result0;
+ try
{
- return new FilterImpl(filterStr);
+ result0 = new FilterParser().parse(filterStr, identifierMap);
}
+ catch (Throwable e)
+ {
+ HornetQServerLogger.LOGGER.invalidFilter(e, filterStr);
+ throw HornetQMessageBundle.BUNDLE.invalidFilter(e, filterStr);
+ }
+ return new FilterImpl(filterStr, identifierMap, result0);
}
// Constructors ---------------------------------------------------
- private FilterImpl(final SimpleString str) throws HornetQException
+ private FilterImpl(final SimpleString str, final HashMap<SimpleString, Identifier> identifierMap,
+ final Object result0)
{
sfilterString = str;
-
- try
- {
- result = new FilterParser().parse(sfilterString, identifiers);
-
- resultType = result.getClass();
- }
- catch (Throwable e)
- {
- HornetQServerLogger.LOGGER.invalidFilter(e, str);
-
- throw HornetQMessageBundle.BUNDLE.invalidFilter(e, str);
- }
+ identifiers = identifierMap;
+ this.result = result0;
+ resultType = result.getClass();
}
// Filter implementation ---------------------------------------------------------------------
View
9 hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
@@ -278,6 +278,10 @@ public synchronized void allowInvmSecurityOverride(HornetQPrincipal principal)
public synchronized void freeze(final CoreRemotingConnection connectionToKeepOpen)
{
+ if (!started)
+ return;
+ failureCheckAndFlushThread.close(false);
+
for (Acceptor acceptor : acceptors)
{
try
@@ -321,11 +325,6 @@ public void stop(final boolean criticalError) throws Exception
return;
}
- if (!started)
- {
- return;
- }
-
failureCheckAndFlushThread.close(criticalError);
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
View
23 hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
@@ -770,7 +770,8 @@
void unableToStartBridge(@Cause Exception e, SimpleString name);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222119, value = "No connector with name '{0}'. backup cannot be announced.", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222119, value = "No connector with name {0}. backup cannot be announced.",
+ format = Message.Format.MESSAGE_FORMAT)
void announceBackupNoConnector(String connectorName);
@LogMessage(level = Logger.Level.WARN)
@@ -810,27 +811,35 @@
void clusterConnectionNoForwardAddress();
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222129, value = "No connector with name '{0}'. The cluster connection will not be deployed.", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222129, value = "No connector with name {0}. The cluster connection will not be deployed.",
+ format = Message.Format.MESSAGE_FORMAT)
void clusterConnectionNoConnector(String connectorName);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222130, value = "Cluster Configuration '{0}' already exists. The cluster connection will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222130,
+ value = "Cluster Configuration {0} already exists. The cluster connection will not be deployed.",
+ format = Message.Format.MESSAGE_FORMAT)
void clusterConnectionAlreadyExists(String connectorName);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222131, value = "No discovery group with name '{0}'. The cluster connection will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222131, value = "No discovery group with name {0}. The cluster connection will not be deployed.",
+ format = Message.Format.MESSAGE_FORMAT)
void clusterConnectionNoDiscoveryGroup(String discoveryGroupName);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222132, value = "There is already a broadcast-group with name {0} deployed. This one will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupAlreadyExists(String name);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222133, value = "There is no connector deployed with name '{0}'. The broadcast group with name '{1}' will not be deployed." , format = Message.Format.MESSAGE_FORMAT)
+ @Message(
+ id = 222133,
+ value = "There is no connector deployed with name {0}. The broadcast group with name {1} will not be deployed.",
+ format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupNoConnector(String connectorName, String bgName);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222134, value = "No connector defined with name '{0}'. The bridge will not be deployed.", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222134, value = "No connector defined with name {0}. The bridge will not be deployed.",
+ format = Message.Format.MESSAGE_FORMAT)
void bridgeNoConnector(String name);
@LogMessage(level = Logger.Level.WARN)
@@ -1219,7 +1228,7 @@
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224058,
- value = "Can't find queue {0} while reloading PAGE_CURSOR_COMPLETE, deleting record now",
+ value = "Cannot find queue {0} while reloading PAGE_CURSOR_COMPLETE, deleting record now",
format = Message.Format.MESSAGE_FORMAT)
void cantFindQueueOnPageComplete(long queueID);
View
34 hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
@@ -39,6 +39,7 @@
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.ConfigurationUtils;
+import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
@@ -257,12 +258,18 @@ public synchronized void start() throws Exception
}
}
+ deployConfiguredBridges();
+ state = State.STARTED;
+ }
+
+ private final void deployConfiguredBridges() throws Exception
+ {
+ if (backup)
+ return;
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
{
- deployBridge(config, !backup);
+ deployBridge(config);
}
-
- state = State.STARTED;
}
public void stop() throws Exception
@@ -351,13 +358,21 @@ public ClusterConnection getClusterConnection(final String name)
return clusterConnections.get(name);
}
- // backup node becomes live
- public synchronized void activate()
+ /**
+ * Activates several cluster services. Used by backups on failover.
+ * @throws Exception
+ */
+ public synchronized void activate() throws Exception
{
+ if (state != State.STARTED && state != State.DEPLOYED)
+ return;
+
if (backup)
{
backup = false;
+ deployConfiguredBridges();
+
for (BroadcastGroup broadcastGroup : broadcastGroups.values())
{
try
@@ -440,7 +455,7 @@ public void removeClusterLocator(final ServerLocatorInternal serverLocator)
this.clusterLocators.remove(serverLocator);
}
- public synchronized void deployBridge(final BridgeConfiguration config, final boolean start) throws Exception
+ public synchronized void deployBridge(final BridgeConfiguration config) throws Exception
{
if (config.getName() == null)
{
@@ -584,7 +599,7 @@ public synchronized void deployBridge(final BridgeConfiguration config, final bo
new SimpleString(config.getName()),
queue,
executorFactory.getExecutor(),
- SimpleString.toSimpleString(config.getFilterString()),
+ FilterImpl.createFilter(config.getFilterString()),
SimpleString.toSimpleString(config.getForwardingAddress()),
scheduledExecutor,
transformer,
@@ -598,10 +613,7 @@ public synchronized void deployBridge(final BridgeConfiguration config, final bo
managementService.registerBridge(bridge, config);
- if (start)
- {
- bridge.start();
- }
+ bridge.start();
}
View
7 hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
@@ -35,7 +35,6 @@
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HandleStatus;
@@ -148,15 +147,15 @@ public BridgeImpl(final ServerLocatorInternal serverLocator,
final SimpleString name,
final Queue queue,
final Executor executor,
- final SimpleString filterString,
+ final Filter filter,
final SimpleString forwardingAddress,
final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
final boolean useDuplicateDetection,
final String user,
final String password,
final boolean activated,
- final StorageManager storageManager) throws Exception
+ final StorageManager storageManager)
{
this.reconnectAttempts = reconnectAttempts;
@@ -181,7 +180,7 @@ public BridgeImpl(final ServerLocatorInternal serverLocator,
this.scheduledExecutor = scheduledExecutor;
- filter = FilterImpl.createFilter(filterString);
+ this.filter = filter;
this.forwardingAddress = forwardingAddress;
View
5 ...tq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -30,6 +30,7 @@
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.BindingType;
@@ -86,7 +87,7 @@ public ClusterConnectionBridge(final ClusterConnection clusterConnection, final
final SimpleString name,
final Queue queue,
final Executor executor,
- final SimpleString filterString,
+ final Filter filterString,
final SimpleString forwardingAddress,
final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
@@ -98,7 +99,7 @@ public ClusterConnectionBridge(final ClusterConnection clusterConnection, final
final SimpleString managementAddress,
final SimpleString managementNotificationAddress,
final MessageFlowRecord flowRecord,
- final TransportConfiguration connector) throws Exception
+ final TransportConfiguration connector)
{
super(targetLocator,
reconnectAttempts,
View
2 hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
@@ -1288,7 +1288,7 @@ public void deployBridge(BridgeConfiguration config) throws Exception
{
if (clusterManager != null)
{
- clusterManager.deployBridge(config, true);
+ clusterManager.deployBridge(config);
}
}
View
5 ...q-server/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
@@ -24,6 +24,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
import javax.management.ObjectName;
@@ -521,7 +523,8 @@ public synchronized void unregisterFromRegistry(final String resourceName)
// the JMX unregistration is synchronized to avoid race conditions if 2 clients tries to
// unregister the same resource (e.g. a queue) at the same time since unregisterMBean()
// will throw an exception if the MBean has already been unregistered
- public void unregisterFromJMX(final ObjectName objectName) throws Exception
+ public void unregisterFromJMX(final ObjectName objectName) throws MBeanRegistrationException,
+ InstanceNotFoundException
{
if (!jmxManagementEnabled)
{
View
544 ...tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
@@ -46,18 +46,64 @@
/**
* A BridgeReconnectTest
- *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 20 Jan 2009 19:20:41
- *
- *
*/
public class BridgeReconnectTest extends BridgeTestBase
{
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
- private final int NUM_MESSAGES = 100;
+ private static final int NUM_MESSAGES = 100;
+
+ Map<String, Object> server0Params;
+ Map<String, Object> server1Params;
+ Map<String, Object> server2Params;
+
+ HornetQServer server0;
+ HornetQServer server1;
+ HornetQServer server2;
+ ServerLocator locator;
+
+ ClientSession session0;
+ ClientSession session1;
+ ClientSession session2;
+
+ private TransportConfiguration server1tc;
+ private Map<String, TransportConfiguration> connectors;
+ private ArrayList<String> staticConnectors;
+
+ final String bridgeName = "bridge1";
+ final String testAddress = "testAddress";
+ final String queueName = "queue0";
+ final String forwardAddress = "forwardAddress";
+
+ final long retryInterval = 50;
+ final double retryIntervalMultiplier = 1d;
+ final int confirmationWindowSize = 1024;
+ int reconnectAttempts = 3;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ server0Params = new HashMap<String, Object>();
+ server1Params = new HashMap<String, Object>();
+ server2Params = new HashMap<String, Object>();
+ connectors = new HashMap<String, TransportConfiguration>();
+
+ server1 = createHornetQServer(1, isNetty(), server1Params);
+ server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
+ connectors.put(server1tc.getName(), server1tc);
+ staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ locator = null;
+ super.tearDown();
+ }
+
protected boolean isNetty()
{
return false;
@@ -75,108 +121,115 @@ private String getConnector()
return INVM_CONNECTOR_FACTORY;
}
- // Fail bridge and reconnecting immediately
- public void testFailoverAndReconnectImmediately() throws Exception
+ /**
+ * Backups must successfully deploy its bridges on fail-over.
+ * @see https://bugzilla.redhat.com/show_bug.cgi?id=900764
+ */
+ public void testFailoverDeploysBridge() throws Exception
{
NodeManager nodeManager = new InVMNodeManager(false);
- Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
+ server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
+ server2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
+ TransportConfiguration server2tc = new TransportConfiguration(getConnector(), server2Params, "server2tc");
- Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
+ connectors.put(server2tc.getName(), server2tc);
- Map<String, Object> server2Params = new HashMap<String, Object>();
- HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+ server1.getConfiguration().setConnectorConfigurations(connectors);
+ server2.getConfiguration().setConnectorConfigurations(connectors);
+ reconnectAttempts = -1;
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
+ BridgeConfiguration bridgeConfiguration = createBridgeConfig();
+ bridgeConfiguration.setQueueName(queueName);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ server2.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs1);
+ server2.getConfiguration().setQueueConfigurations(queueConfigs1);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
+ startServers();
- TransportConfiguration server2tc = new TransportConfiguration(getConnector(), server2Params, "server2tc");
+ waitForServerStart(server0);
+ server0.stop(true);
- connectors.put(server1tc.getName(), server1tc);
+ waitForServerStart(server2);
+
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server2tc));
+
+ ClientSessionFactory csf0 = addSessionFactory(locator.createSessionFactory(server2tc));
+
+ session0 = csf0.createSession(false, true, true);
+ Map<String, Bridge> bridges = server2.getClusterManager().getBridges();
+ assertTrue("backup must deploy bridge on failover", !bridges.isEmpty());
+ }
+
+ // Fail bridge and reconnecting immediately
+ public void testFailoverAndReconnectImmediately() throws Exception
+ {
+ NodeManager nodeManager = new InVMNodeManager(false);
+ server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
+ server2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
+ TransportConfiguration server2tc = new TransportConfiguration(getConnector(), server2Params, "server2tc");
connectors.put(server2tc.getName(), server2tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
server1.getConfiguration().setConnectorConfigurations(connectors);
- final String bridgeName = "bridge1";
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
-
- final long retryInterval = 50;
- final double retryIntervalMultiplier = 1d;
- final int reconnectAttempts = 1;
- final int confirmationWindowSize = 1024;
-
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ reconnectAttempts = 1;
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- retryInterval,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- retryIntervalMultiplier,
- reconnectAttempts,
- true,
- confirmationWindowSize,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.getDefaultClusterUser(),
- CLUSTER_PASSWORD);
+ BridgeConfiguration bridgeConfiguration = createBridgeConfig();
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- service2.getConfiguration().setQueueConfigurations(queueConfigs1);
-
- ServerLocator locator = null;
+ server2.getConfiguration().setQueueConfigurations(queueConfigs1);
- try
- {
- service2.start();
- server1.start();
- server0.start();
+ startServers();
BridgeReconnectTest.log.info("** failing connection");
// Now we will simulate a failure of the bridge connection between server0 and server1
server0.stop(true);
- waitForServerStart(service2);
+ waitForServerStart(server2);
locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server2tc));
- ClientSessionFactory csf0 = addSessionFactory(locator.createSessionFactory(server2tc));
+ ClientSessionFactory csf0 = addSessionFactory(locator.createSessionFactory(server2tc));
- ClientSession session0 = csf0.createSession(false, true, true);
+ session0 = csf0.createSession(false, true, true);
- ClientProducer prod0 = session0.createProducer(testAddress);
+ ClientProducer prod0 = session0.createProducer(testAddress);
- ClientSessionFactory csf2 = addSessionFactory(locator.createSessionFactory(server2tc));
+ ClientSessionFactory csf2 = addSessionFactory(locator.createSessionFactory(server2tc));
- ClientSession session2 = csf2.createSession(false, true, true);
+ session2 = csf2.createSession(false, true, true);
- ClientConsumer cons2 = session2.createConsumer(queueName0);
+ ClientConsumer cons2 = session2.createConsumer(queueName);
session2.start();
@@ -198,121 +251,70 @@ public void testFailoverAndReconnectImmediately() throws Exception
Assert.assertNotNull(r1);
Assert.assertEquals(i, r1.getObjectProperty(propKey));
}
+ closeServers();
- session0.close();
- session2.close();
- }
- finally
- {
- if (locator != null)
- {
- locator.close();
- }
- server0.stop();
- server1.stop();
- service2.stop();
- }
+ assertNoMoreConnections();
+ }
- Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
- Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
- Assert.assertEquals(0, service2.getRemotingService().getConnections().size());
+ private BridgeConfiguration createBridgeConfig()
+ {
+ return new BridgeConfiguration(bridgeName, queueName, forwardAddress, null, null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL, retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, retryIntervalMultiplier,
+ reconnectAttempts, true, confirmationWindowSize, staticConnectors, false,
+ HornetQDefaultConfiguration.getDefaultClusterUser(), CLUSTER_PASSWORD);
}
// Fail bridge and attempt failover a few times before succeeding
public void testFailoverAndReconnectAfterAFewTries() throws Exception
{
NodeManager nodeManager = new InVMNodeManager(false);
- Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
-
- Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
-
- Map<String, Object> server2Params = new HashMap<String, Object>();
- HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
-
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
+ server0 = createHornetQServer(0, server0Params, isNetty(), nodeManager);
+ server2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
TransportConfiguration server2tc = new TransportConfiguration(getConnector(), server2Params, "server2tc");
- connectors.put(server1tc.getName(), server1tc);
-
connectors.put(server2tc.getName(), server2tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
server1.getConfiguration().setConnectorConfigurations(connectors);
- final String bridgeName = "bridge1";
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
-
- final long retryInterval = 50;
- final double retryIntervalMultiplier = 1d;
- final int reconnectAttempts = 3;
- final int confirmationWindowSize = 1024;
-
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
-
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- retryInterval,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- retryIntervalMultiplier,
- reconnectAttempts,
- true,
- confirmationWindowSize,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.getDefaultClusterUser(),
- CLUSTER_PASSWORD);
+ BridgeConfiguration bridgeConfiguration = createBridgeConfig();
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- service2.getConfiguration().setQueueConfigurations(queueConfigs1);
+ server2.getConfiguration().setQueueConfigurations(queueConfigs1);
- ServerLocator locator = null;
- try
- {
- service2.start();
- server1.start();
- server0.start();
+ startServers();
// Now we will simulate a failure of the bridge connection between server0 and server1
server0.stop(true);
locator = addServerLocator(HornetQClient.createServerLocatorWithHA(server2tc));
locator.setReconnectAttempts(100);
ClientSessionFactory csf0 = addSessionFactory(locator.createSessionFactory(server2tc));
- ClientSession session0 = csf0.createSession(false, true, true);
+ session0 = csf0.createSession(false, true, true);
ClientSessionFactory csf2 = addSessionFactory(locator.createSessionFactory(server2tc));
- ClientSession session2 = csf2.createSession(false, true, true);
+ session2 = csf2.createSession(false, true, true);
ClientProducer prod0 = session0.createProducer(testAddress);
- ClientConsumer cons2 = session2.createConsumer(queueName0);
+ ClientConsumer cons2 = session2.createConsumer(queueName);
session2.start();
@@ -334,109 +336,49 @@ public void testFailoverAndReconnectAfterAFewTries() throws Exception
Assert.assertNotNull(r1);
Assert.assertEquals(i, r1.getObjectProperty(propKey));
}
+ closeServers();
- session0.close();
- session2.close();
- }
- finally
- {
- if (locator != null)
- {
- locator.close();
- }
-
- server0.stop();
- server1.stop();
- service2.stop();
- }
-
- Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
- Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
- Assert.assertEquals(0, service2.getRemotingService().getConnections().size());
+ assertNoMoreConnections();
}
// Fail bridge and reconnect same node, no backup specified
public void testReconnectSameNode() throws Exception
{
- Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
-
- Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
+ server0 = createHornetQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
-
- connectors.put(server1tc.getName(), server1tc);
-
server0.getConfiguration().setConnectorConfigurations(connectors);
server1.getConfiguration().setConnectorConfigurations(connectors);
- final String bridgeName = "bridge1";
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
-
- final long retryInterval = 50;
- final double retryIntervalMultiplier = 1d;
- final int reconnectAttempts = 3;
- final int confirmationWindowSize = 1024;
-
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
-
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- retryInterval,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- retryIntervalMultiplier,
- reconnectAttempts,
- true,
- confirmationWindowSize,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.getDefaultClusterUser(),
- CLUSTER_PASSWORD);
+ BridgeConfiguration bridgeConfiguration = createBridgeConfig();
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- ServerLocator locator = null;
- try
- {
- server1.start();
- server0.start();
+ startServers();
locator = addServerLocator(HornetQClient.createServerLocatorWithHA(server0tc, server1tc));
ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
- ClientSession session0 = csf0.createSession(false, true, true);
+ session0 = csf0.createSession(false, true, true);
ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
- ClientSession session1 = csf1.createSession(false, true, true);
+ session1 = csf1.createSession(false, true, true);
ClientProducer prod0 = session0.createProducer(testAddress);
- ClientConsumer cons1 = session1.createConsumer(queueName0);
+ ClientConsumer cons1 = session1.createConsumer(queueName);
session1.start();
@@ -469,23 +411,9 @@ public void testReconnectSameNode() throws Exception
Assert.assertNotNull(r1);
Assert.assertEquals(i, r1.getObjectProperty(propKey));
}
+ closeServers();
- session0.close();
- session1.close();
- }
- finally
- {
- if (locator != null)
- {
- locator.close();
- }
-
- server0.stop();
- server1.stop();
- }
-
- Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
- Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
+ assertNoMoreConnections();
}
// We test that we can pause more than client failure check period (to prompt the pinger to failing)
@@ -502,39 +430,16 @@ public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception
{
- Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
-
- Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
-
+ server0 = createHornetQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
-
- connectors.put(server1tc.getName(), server1tc);
-
server0.getConfiguration().setConnectorConfigurations(connectors);
server1.getConfiguration().setConnectorConfigurations(connectors);
-
- final String bridgeName = "bridge1";
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
-
- final long retryInterval = 50;
- final double retryIntervalMultiplier = 1d;
- final int reconnectAttempts = -1;
- final int confirmationWindowSize = 1024;
+ reconnectAttempts = -1;
final long clientFailureCheckPeriod = 1000;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
-
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
- queueName0,
+ queueName,
forwardAddress,
null,
null,
@@ -556,28 +461,23 @@ private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep)
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
-
- ServerLocator locator = null;
- try
- {
- server1.start();
- server0.start();
+ startServers();
waitForServerStart(server0);
waitForServerStart(server1);
locator = addServerLocator(HornetQClient.createServerLocatorWithHA(server0tc, server1tc));
ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
- ClientSession session0 = csf0.createSession(false, true, true);
+ session0 = csf0.createSession(false, true, true);
ClientProducer prod0 = session0.createProducer(testAddress);
@@ -594,9 +494,9 @@ private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep)
BridgeReconnectTest.log.info("server 1 restarted");
ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
- ClientSession session1 = csf1.createSession(false, true, true);
+ session1 = csf1.createSession(false, true, true);
- ClientConsumer cons1 = session1.createConsumer(queueName0);
+ ClientConsumer cons1 = session1.createConsumer(queueName);
session1.start();
@@ -623,105 +523,77 @@ private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep)
}
BridgeReconnectTest.log.info("got messages");
+ closeServers();
+ assertNoMoreConnections();
+ }
+ /**
+ * @throws Exception
+ */
+ private void closeServers() throws Exception
+ {
+ if (session0 != null)
session0.close();
+ if (session1 != null)
session1.close();
- }
- finally
- {
- if (locator != null)
- {
- locator.close();
- }
+ if (session2 != null)
+ session2.close();
- server0.stop();
- server1.stop();
+ if (locator != null)
+ {
+ locator.close();
}
+ server0.stop();
+ server1.stop();
+ if (server2 != null)
+ server2.stop();
+ }
+
+ private void assertNoMoreConnections()
+ {
Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
+ if (server2 != null)
+ Assert.assertEquals(0, server2.getRemotingService().getConnections().size());
}
public void testFailoverThenFailAgainAndReconnect() throws Exception
{
- Map<String, Object> server0Params = new HashMap<String, Object>();
- HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
-
- Map<String, Object> server1Params = new HashMap<String, Object>();
- HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
+ server0 = createHornetQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
-
- connectors.put(server1tc.getName(), server1tc);
-
server0.getConfiguration().setConnectorConfigurations(connectors);
- final String bridgeName = "bridge1";
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
-
- final long retryInterval = 50;
- final double retryIntervalMultiplier = 1d;
- final int reconnectAttempts = 3;
- final int confirmationWindowSize = 1024;
-
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
-
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- retryInterval,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- retryIntervalMultiplier,
- reconnectAttempts,
- true,
- confirmationWindowSize,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.getDefaultClusterUser(),
- CLUSTER_PASSWORD);
+ BridgeConfiguration bridgeConfiguration = createBridgeConfig();
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName0, null, true);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- ServerLocator locator = null;
- try
- {
- server1.start();
- server0.start();
+ startServers();
locator = addServerLocator(HornetQClient.createServerLocatorWithHA(server0tc, server1tc));
ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
- ClientSession session0 = csf0.createSession(false, true, true);
+ session0 = csf0.createSession(false, true, true);
ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
- ClientSession session1 = csf1.createSession(false, true, true);
+ session1 = csf1.createSession(false, true, true);
ClientProducer prod0 = session0.createProducer(testAddress);
- ClientConsumer cons1 = session1.createConsumer(queueName0);
+ ClientConsumer cons1 = session1.createConsumer(queueName);
session1.start();
@@ -789,27 +661,21 @@ public void testFailoverThenFailAgainAndReconnect() throws Exception
}
- session0.close();
- session1.close();
-
- if (outOfOrder != -1)
+ if (outOfOrder != -1)
{
fail("Message " + outOfOrder + " was received out of order, it was supposed to be " + supposed);
}
- }
- finally
- {
- if (locator != null)
- {
- locator.close();
- }
+ closeServers();
- server0.stop();
- server1.stop();
- }
+ assertNoMoreConnections();
+ }
- Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
- Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
+ private void startServers() throws Exception
+ {
+ if (server2 != null)
+ server2.start();
+ server1.start();
+ server0.start();
}
private RemotingConnection getForwardingConnection(final Bridge bridge) throws Exception

0 comments on commit 7c558ea

Please sign in to comment.