Skip to content
Browse files

Merge pull request #675 from clebertsuconic/master-bridge

Waiting pending ACKs to return before stopping the bridge, also avoiding...
  • Loading branch information...
2 parents 17a7f95 + 51bfc29 commit d0c1b7ac6ac6caa57fa9b2d51a9a7eac16a9b1a6 @jbertram jbertram committed Nov 15, 2012
View
6 hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
@@ -1247,4 +1247,10 @@
value = "Can't find queue {0} while reloading PAGE_CURSOR_COMPLETE, deleting record now",
format = Message.Format.MESSAGE_FORMAT)
void cantFindQueueOnPageComplete(long queueID);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 224084,
+ value = "Bridge {0} timed out waiting for the completion of {1} messages, we will just shutdown the bridge after 10 seconds wait",
+ format = Message.Format.MESSAGE_FORMAT)
+ void timedOutWaitingCompletions(String bridgeName, long numberOfMessages);
}
View
98 hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
@@ -49,6 +49,7 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.FutureLatch;
+import org.hornetq.utils.ReusableLatch;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -78,6 +79,8 @@
protected final ServerLocatorInternal serverLocator;
+ private final ReusableLatch pendingAcks = new ReusableLatch(0);
+
private final UUID nodeUUID;
private final SimpleString name;
@@ -329,7 +332,6 @@ public void stop() throws Exception
{
HornetQServerLogger.LOGGER.debug("Bridge " + this.name + " being stopped");
}
- cleanUpSessionFactory(csf);
if (futureScheduledReconnection != null)
{
@@ -451,22 +453,26 @@ public void setupRetry(final int currentCount, final int maxRetry)
public void sendAcknowledged(final Message message)
{
- try
+ if (active)
{
- final MessageReference ref = refs.poll();
-
- if (ref != null)
+ try
{
- if (isTrace)
+ final MessageReference ref = refs.poll();
+
+ if (ref != null)
{
- HornetQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue());
+ if (isTrace)
+ {
+ HornetQServerLogger.LOGGER.trace(this + " Acking " + ref + " on queue " + ref.getQueue());
+ }
+ ref.getQueue().acknowledge(ref);
+ pendingAcks.countDown();
}
- ref.getQueue().acknowledge(ref);
}
- }
- catch (Exception e)
- {
- HornetQServerLogger.LOGGER.bridgeFailedToAck(e);
+ catch (Exception e)
+ {
+ HornetQServerLogger.LOGGER.bridgeFailedToAck(e);
+ }
}
}
@@ -563,15 +569,26 @@ public HandleStatus handle(final MessageReference ref) throws Exception
dest = message.getAddress();
}
- if (message.isLargeMessage())
+ pendingAcks.countUp();
+
+ try
{
- deliveringLargeMessage = true;
- deliverLargeMessage(dest, ref, (LargeServerMessage)message);
- return HandleStatus.HANDLED;
+ if (message.isLargeMessage())
+ {
+ deliveringLargeMessage = true;
+ deliverLargeMessage(dest, ref, (LargeServerMessage)message);
+ return HandleStatus.HANDLED;
+ }
+ else
+ {
+ return deliverStandardMessage(dest, ref, message);
+ }
}
- else
+ catch (Exception e)
{
- return deliverStandardMessage(dest, ref, message);
+ // If an exception happened, we must count down immediately
+ pendingAcks.countDown();
+ throw e;
}
}
}
@@ -831,6 +848,7 @@ protected void connect()
producer = session.createProducer();
session.addFailureListener(BridgeImpl.this);
+
session.setSendAcknowledgementHandler(BridgeImpl.this);
afterConnect();
@@ -952,6 +970,26 @@ public void run()
try
{
HornetQServerLogger.LOGGER.debug("stopping bridge " + BridgeImpl.this);
+ queue.removeConsumer(BridgeImpl.this);
+
+ if (!pendingAcks.await(10, TimeUnit.SECONDS))
+ {
+ HornetQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(),
+ pendingAcks.getCount());
+ }
+
+ synchronized (BridgeImpl.this)
+ {
+ HornetQServerLogger.LOGGER.debug("Closing Session for bridge " + BridgeImpl.this.name);
+
+ started = false;
+
+ active = false;
+
+ }
+
+
+ internalCancelReferences();
if (session != null)
{
@@ -972,20 +1010,6 @@ public void run()
csf.cleanup();
}
- queue.removeConsumer(BridgeImpl.this);
-
- internalCancelReferences();
-
- synchronized (BridgeImpl.this)
- {
- HornetQServerLogger.LOGGER.debug("Closing Session for bridge " + BridgeImpl.this.name);
-
- started = false;
-
- active = false;
-
- }
-
if (isTrace)
{
HornetQServerLogger.LOGGER.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
@@ -1005,14 +1029,20 @@ public void run()
{
try
{
+ queue.removeConsumer(BridgeImpl.this);
+
+ if (!pendingAcks.await(60, TimeUnit.SECONDS))
+ {
+ HornetQServerLogger.LOGGER.timedOutWaitingCompletions(BridgeImpl.this.toString(),
+ pendingAcks.getCount());
+ }
+
synchronized (BridgeImpl.this)
{
started = false;
active = false;
}
- queue.removeConsumer(BridgeImpl.this);
-
internalCancelReferences();
HornetQServerLogger.LOGGER.bridgePaused(name);
View
1,304 ...egration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
@@ -113,129 +113,129 @@ public void testSimpleBridgeLargeMessageFiles() throws Exception
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
+ server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
- Map<String, Object> server1Params = new HashMap<String, Object>();
- addTargetParameters(server1Params);
- server1 = createClusteredServerWithParams(isNetty(), 1, useFiles, server1Params);
-
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
- final String queueName1 = "queue1";
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, useFiles, server1Params);
- // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- connectors.put(server1tc.getName(), server1tc);
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- final int messageSize = 1024;
+ HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- final int numMessages = 10;
+ final int messageSize = 1024;
- ArrayList<String> connectorConfig = new ArrayList<String>();
- connectorConfig.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- // Choose confirmation size to make sure acks
- // are sent
- numMessages * messageSize / 2,
- connectorConfig,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ final int numMessages = 10;
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ ArrayList<String> connectorConfig = new ArrayList<String>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose confirmation size to make sure acks
+ // are sent
+ numMessages * messageSize / 2,
+ connectorConfig,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
- List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
- List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- server1.start();
- server0.start();
- locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
- ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+ server1.start();
+ server0.start();
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- session1.start();
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- final byte[] bytes = new byte[messageSize];
+ session1.start();
- final SimpleString propKey = new SimpleString("testkey");
+ final byte[] bytes = new byte[messageSize];
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ final SimpleString propKey = new SimpleString("testkey");
- if (largeMessage)
- {
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
- }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.putIntProperty(propKey, i);
+ if (largeMessage)
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
+ }
- message.getBodyBuffer().writeBytes(bytes);
+ message.putIntProperty(propKey, i);
- producer0.send(message);
- }
+ message.getBodyBuffer().writeBytes(bytes);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(200);
+ producer0.send(message);
+ }
- Assert.assertNotNull(message);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
- Assert.assertEquals(i, message.getObjectProperty(propKey));
+ Assert.assertNotNull(message);
- if (largeMessage)
- {
- readMessages(message);
- }
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
- message.acknowledge();
+ if (largeMessage)
+ {
+ readMessages(message);
}
- Assert.assertNull(consumer1.receiveImmediate());
+ message.acknowledge();
+ }
- session0.close();
+ Assert.assertNull(consumer1.receiveImmediate());
- session1.close();
+ session0.close();
- sf0.close();
+ session1.close();
- sf1.close();
+ sf0.close();
+
+ sf1.close();
closeFields();
assertEquals(0, loadQueues(server0).size());
@@ -254,8 +254,8 @@ public void testLostMessageLargeMessage() throws Exception
}
/** This test will ignore messages
- What will cause the bridge to fail with a timeout
- The bridge should still recover the failure and reconnect on that case */
+ What will cause the bridge to fail with a timeout
+ The bridge should still recover the failure and reconnect on that case */
public void internalTestMessageLoss(final boolean largeMessage) throws Exception
{
class MyInterceptor implements Interceptor
@@ -271,7 +271,7 @@ public void internalTestMessageLoss(final boolean largeMessage) throws Exception
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
if (ignoreSends && packet instanceof SessionSendMessage ||
- ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage)packet).isContinues())
+ ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage)packet).isContinues())
{
System.out.println("Ignored");
latch.countDown();
@@ -287,136 +287,136 @@ public boolean intercept(Packet packet, RemotingConnection connection) throws Ho
MyInterceptor myInterceptor = new MyInterceptor(3);
- Map<String, Object> server0Params = new HashMap<String, Object>();
+ Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
- Map<String, Object> server1Params = new HashMap<String, Object>();
- addTargetParameters(server1Params);
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
- final String queueName1 = "queue1";
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- connectors.put(server1tc.getName(), server1tc);
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- final int messageSize = 1024;
+ final int messageSize = 1024;
- final int numMessages = 1;
+ final int numMessages = 1;
- ArrayList<String> connectorConfig = new ArrayList<String>();
- connectorConfig.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- // Choose confirmation size to make sure acks
- // are sent
- numMessages * messageSize / 2,
- connectorConfig,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
-
- bridgeConfiguration.setCallTimeout(500);
+ ArrayList<String> connectorConfig = new ArrayList<String>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose confirmation size to make sure acks
+ // are sent
+ numMessages * messageSize / 2,
+ connectorConfig,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+
+ bridgeConfiguration.setCallTimeout(500);
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
- List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
- List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- server1.start();
+ server1.start();
server1.getRemotingService().addIncomingInterceptor(myInterceptor);
- server0.start();
+ server0.start();
locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- session1.start();
+ session1.start();
- final byte[] bytes = new byte[messageSize];
+ final byte[] bytes = new byte[messageSize];
- final SimpleString propKey = new SimpleString("testkey");
+ final SimpleString propKey = new SimpleString("testkey");
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+
+ if (largeMessage)
{
- ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
+ }
- if (largeMessage)
- {
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
- }
+ message.putIntProperty(propKey, i);
- message.putIntProperty(propKey, i);
+ message.getBodyBuffer().writeBytes(bytes);
- message.getBodyBuffer().writeBytes(bytes);
-
- producer0.send(message);
- }
+ producer0.send(message);
+ }
assertTrue("where is the countDown?", myInterceptor.latch.await(30, TimeUnit.SECONDS));
- myInterceptor.ignoreSends = false;
+ myInterceptor.ignoreSends = false;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(30000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(30000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull(message);
- Assert.assertEquals(i, message.getObjectProperty(propKey));
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
- if (largeMessage)
- {
- readMessages(message);
- }
-
- message.acknowledge();
+ if (largeMessage)
+ {
+ readMessages(message);
}
- Assert.assertNull(consumer1.receiveImmediate());
+ message.acknowledge();
+ }
- session0.close();
+ Assert.assertNull(consumer1.receiveImmediate());
- session1.close();
+ session0.close();
- sf0.close();
+ session1.close();
- sf1.close();
+ sf0.close();
+
+ sf1.close();
closeFields();
assertEquals("there should be no queues", 0, loadQueues(server0).size());
}
@@ -474,149 +474,149 @@ public void internalTestWithFilter(final boolean largeMessage, final boolean use
final int numMessages = 10;
- Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
-
- Map<String, Object> server1Params = new HashMap<String, Object>();
- addTargetParameters(server1Params);
- server1 = createClusteredServerWithParams(isNetty(), 1, useFiles, server1Params);
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
- final String queueName1 = "queue1";
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, useFiles, server1Params);
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- connectors.put(server1tc.getName(), server1tc);
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
- final String filterString = "animal='goat'";
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- filterString,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- 0,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ final String filterString = "animal='goat'";
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ filterString,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 0,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
- List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
- List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- server1.start();
- server0.start();
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
- ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+ server1.start();
+ server0.start();
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- session1.start();
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- final SimpleString propKey = new SimpleString("testkey");
+ session1.start();
- final SimpleString selectorKey = new SimpleString("animal");
+ final SimpleString propKey = new SimpleString("testkey");
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ final SimpleString selectorKey = new SimpleString("animal");
- message.putIntProperty(propKey, i);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.putStringProperty(selectorKey, new SimpleString("monkey"));
+ message.putIntProperty(propKey, i);
- if (largeMessage)
- {
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
- }
+ message.putStringProperty(selectorKey, new SimpleString("monkey"));
- producer0.send(message);
+ if (largeMessage)
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
}
- Assert.assertNull(consumer1.receiveImmediate());
+ producer0.send(message);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ Assert.assertNull(consumer1.receiveImmediate());
- message.putIntProperty(propKey, i);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.putStringProperty(selectorKey, new SimpleString("goat"));
+ message.putIntProperty(propKey, i);
- if (largeMessage)
- {
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
- }
+ message.putStringProperty(selectorKey, new SimpleString("goat"));
- producer0.send(message);
+ if (largeMessage)
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
}
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(2000);
+ producer0.send(message);
+ }
- Assert.assertNotNull(message);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(2000);
- Assert.assertEquals("goat", message.getStringProperty(selectorKey));
+ Assert.assertNotNull(message);
- Assert.assertEquals(i, message.getObjectProperty(propKey));
+ Assert.assertEquals("goat", message.getStringProperty(selectorKey));
- message.acknowledge();
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
- if (largeMessage)
- {
- readMessages(message);
- }
+ message.acknowledge();
+
+ if (largeMessage)
+ {
+ readMessages(message);
}
+ }
- session0.commit();
+ session0.commit();
- session1.commit();
+ session1.commit();
- Assert.assertNull(consumer1.receiveImmediate());
+ Assert.assertNull(consumer1.receiveImmediate());
- session0.close();
+ session0.close();
- session1.close();
+ session1.close();
- sf0.close();
+ sf0.close();
- sf1.close();
+ sf1.close();
closeFields();
if (useFiles)
{
@@ -635,128 +635,128 @@ public void internalTestWithFilter(final boolean largeMessage, final boolean use
// Created to verify JBPAPP-6057
public void testStartLater() throws Exception
{
- Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
-
- Map<String, Object> server1Params = new HashMap<String, Object>();
- addTargetParameters(server1Params);
- server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "jms.queue.forwardAddress";
- final String queueName1 = "forwardAddress";
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- connectors.put(server1tc.getName(), server1tc);
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "jms.queue.forwardAddress";
+ final String queueName1 = "forwardAddress";
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 100,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- 1024,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 100,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 1024,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
- List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- server0.start();
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
- ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+ server0.start();
- ClientSession session0 = sf0.createSession(false, true, true);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientSession session0 = sf0.createSession(false, true, true);
- final int numMessages = 100;
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- final SimpleString propKey = new SimpleString("testkey");
+ final int numMessages = 100;
- final SimpleString selectorKey = new SimpleString("animal");
+ final SimpleString propKey = new SimpleString("testkey");
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ final SimpleString selectorKey = new SimpleString("animal");
- message.getBodyBuffer().writeBytes(new byte[1024]);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.putIntProperty(propKey, i);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
- message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+ message.putIntProperty(propKey, i);
- producer0.send(message);
- }
+ message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
- server1.start();
+ producer0.send(message);
+ }
- Thread.sleep(1000);
+ server1.start();
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+ Thread.sleep(1000);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
- try
- {
- session1.createQueue(forwardAddress, queueName1);
- }
- catch (Throwable ignored)
- {
- ignored.printStackTrace();
- }
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ try
+ {
+ session1.createQueue(forwardAddress, queueName1);
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
- session1.start();
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(5000);
- assertNotNull(message);
- message.acknowledge();
- }
+ session1.start();
- session1.commit();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
- Assert.assertNull(consumer1.receiveImmediate());
+ session1.commit();
- consumer1.close();
+ Assert.assertNull(consumer1.receiveImmediate());
- session1.deleteQueue(queueName1);
+ consumer1.close();
- session1.close();
+ session1.deleteQueue(queueName1);
- sf1.close();
+ session1.close();
- server1.stop();
+ sf1.close();
- session0.close();
+ server1.stop();
- sf0.close();
+ session0.close();
+
+ sf0.close();
closeFields();
assertEquals(0, loadQueues(server0).size());
@@ -766,155 +766,155 @@ public void testStartLater() throws Exception
public void testWithDuplicates() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
- Map<String, Object> server1Params = new HashMap<String, Object>();
- addTargetParameters(server1Params);
- server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "jms.queue.forwardAddress";
- final String queueName1 = "forwardAddress";
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "jms.queue.forwardAddress";
+ final String queueName1 = "forwardAddress";
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- connectors.put(server1tc.getName(), server1tc);
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 100,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- true,
- 0,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 100,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ true,
+ 0,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
- List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- server0.start();
+ server0.start();
- locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
- ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- final int numMessages = 1000;
+ final int numMessages = 1000;
- final SimpleString propKey = new SimpleString("testkey");
+ final SimpleString propKey = new SimpleString("testkey");
- final SimpleString selectorKey = new SimpleString("animal");
+ final SimpleString selectorKey = new SimpleString("animal");
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.getBodyBuffer().writeBytes(new byte[1024]);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
- message.putIntProperty(propKey, i);
+ message.putIntProperty(propKey, i);
- message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+ message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
- producer0.send(message);
- }
+ producer0.send(message);
+ }
- server1.start();
+ server1.start();
- // Inserting the duplicateIDs so the bridge will fail in a few
- {
- long ids[] = new long[100];
+ // Inserting the duplicateIDs so the bridge will fail in a few
+ {
+ long ids[] = new long[100];
- Queue queue = server0.locateQueue(new SimpleString(queueName0));
- LinkedListIterator<MessageReference> iterator = queue.iterator();
+ Queue queue = server0.locateQueue(new SimpleString(queueName0));
+ LinkedListIterator<MessageReference> iterator = queue.iterator();
- for (int i = 0; i < 100; i++)
- {
- iterator.hasNext();
- ids[i] = iterator.next().getMessage().getMessageID();
- }
+ for (int i = 0; i < 100; i++)
+ {
+ iterator.hasNext();
+ ids[i] = iterator.next().getMessage().getMessageID();
+ }
- iterator.close();
+ iterator.close();
- DuplicateIDCache duplicateTargetCache = server1.getPostOffice()
- .getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
+ DuplicateIDCache duplicateTargetCache = server1.getPostOffice()
+ .getDuplicateIDCache(PostOfficeImpl.BRIDGE_CACHE_STR.concat(forwardAddress));
- TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
- for (long id : ids)
- {
- byte[] duplicateArray = BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
- duplicateTargetCache.addToCache(duplicateArray, tx);
- }
- tx.commit();
+ TransactionImpl tx = new TransactionImpl(server1.getStorageManager());
+ for (long id : ids)
+ {
+ byte[] duplicateArray = BridgeImpl.getDuplicateBytes(server0.getNodeManager().getUUID(), id);
+ duplicateTargetCache.addToCache(duplicateArray, tx);
}
+ tx.commit();
+ }
- Thread.sleep(1000);
+ Thread.sleep(1000);
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session1 = sf1.createSession(false, true, true);
- try
- {
- session1.createQueue(forwardAddress, queueName1);
- }
- catch (Throwable ignored)
- {
- ignored.printStackTrace();
- }
+ try
+ {
+ session1.createQueue(forwardAddress, queueName1);
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- session1.start();
+ session1.start();
- for (int i = 100; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(5000);
- assertNotNull(message);
- assertEquals(i, message.getIntProperty(propKey).intValue());
- message.acknowledge();
- }
+ for (int i = 100; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty(propKey).intValue());
+ message.acknowledge();
+ }
- session1.commit();
+ session1.commit();
- Assert.assertNull(consumer1.receiveImmediate());
+ Assert.assertNull(consumer1.receiveImmediate());
- consumer1.close();
+ consumer1.close();
- session1.deleteQueue(queueName1);
+ session1.deleteQueue(queueName1);
- session1.close();
+ session1.close();
- sf1.close();
+ sf1.close();
- server1.stop();
+ server1.stop();
- session0.close();
+ session0.close();
- sf0.close();
+ sf0.close();
closeFields();
assertEquals(0, loadQueues(server0).size());
@@ -964,23 +964,23 @@ public void internaltestWithTransformer(final boolean useFiles) throws Exception
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- SimpleTransformer.class.getName(),
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- 1024,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ queueName0,
+ forwardAddress,
+ null,
+ SimpleTransformer.class.getName(),
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 1024,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
@@ -996,66 +996,66 @@ public void internaltestWithTransformer(final boolean useFiles) throws Exception
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- server1.start();
- server0.start();
+ server1.start();
+ server0.start();
- locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
- ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- session1.start();
+ session1.start();
- final int numMessages = 10;
+ final int numMessages = 10;
- final SimpleString propKey = new SimpleString("wibble");
+ final SimpleString propKey = new SimpleString("wibble");
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.putStringProperty(propKey, new SimpleString("bing"));
+ message.putStringProperty(propKey, new SimpleString("bing"));
- message.getBodyBuffer().writeString("doo be doo be doo be doo");
+ message.getBodyBuffer().writeString("doo be doo be doo be doo");
- producer0.send(message);
- }
+ producer0.send(message);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(200);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
- Assert.assertNotNull(message);
+ Assert.assertNotNull(message);
- SimpleString val = (SimpleString)message.getObjectProperty(propKey);
+ SimpleString val = (SimpleString)message.getObjectProperty(propKey);
- Assert.assertEquals(new SimpleString("bong"), val);
+ Assert.assertEquals(new SimpleString("bong"), val);
- String sval = message.getBodyBuffer().readString();
+ String sval = message.getBodyBuffer().readString();
- Assert.assertEquals("dee be dee be dee be dee", sval);
+ Assert.assertEquals("dee be dee be dee be dee", sval);
- message.acknowledge();
+ message.acknowledge();
- }
+ }
- Assert.assertNull(consumer1.receiveImmediate());
+ Assert.assertNull(consumer1.receiveImmediate());
- session0.close();
+ session0.close();
- session1.close();
+ session1.close();
- sf0.close();
+ sf0.close();
- sf1.close();
+ sf1.close();
assertEquals(0, loadQueues(server0).size());
@@ -1089,23 +1089,23 @@ public void testSawtoothLoad() throws Exception
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- 0,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 0,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
@@ -1337,23 +1337,23 @@ public void testBridgeWithPaging() throws Exception
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- 0,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 0,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
@@ -1489,23 +1489,23 @@ public void testBridgeWithLargeMessage() throws Exception
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- forwardAddress,
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- 1024,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ 1024,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
@@ -1617,122 +1617,122 @@ public void testBridgeWithLargeMessage() throws Exception
public void testNullForwardingAddress() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
- server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
+ server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
- Map<String, Object> server1Params = new HashMap<String, Object>();
- addTargetParameters(server1Params);
- server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String queueName1 = "queue1";
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String queueName1 = "queue1";
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- connectors.put(server1tc.getName(), server1tc);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- final int messageSize = 1024;
+ final int messageSize = 1024;
- final int numMessages = 10;
+ final int numMessages = 10;
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
- // forwarding
- // address to
- // use messages'
- // original
- // address
- null,
- null,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 1000,
- HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
- 1d,
- -1,
- false,
- // Choose confirmation size to make sure acks
- // are sent
- numMessages * messageSize / 2,
- staticConnectors,
- false,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
- HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
+ // forwarding
+ // address to
+ // use messages'
+ // original
+ // address
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose confirmation size to make sure acks
+ // are sent
+ numMessages * messageSize / 2,
+ staticConnectors,
+ false,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_USER,
+ HornetQDefaultConfiguration.DEFAULT_CLUSTER_PASSWORD);
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
- List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- // on server #1, we bind queueName1 to same address testAddress
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(testAddress, queueName1, null, true);
- List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+ // on server #1, we bind queueName1 to same address testAddress
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(testAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- server1.start();
- server0.start();
+ server1.start();
+ server0.start();
- locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
- ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+ locator = addServerLocator(HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- session1.start();
+ session1.start();
- final byte[] bytes = new byte[messageSize];
+ final byte[] bytes = new byte[messageSize];
- final SimpleString propKey = new SimpleString("testkey");
+ final SimpleString propKey = new SimpleString("testkey");
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(true);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
- message.putIntProperty(propKey, i);
+ message.putIntProperty(propKey, i);
- message.getBodyBuffer().writeBytes(bytes);
+ message.getBodyBuffer().writeBytes(bytes);
- producer0.send(message);
- }
+ producer0.send(message);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(200);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
- Assert.assertNotNull(message);
+ Assert.assertNotNull(message);
- Assert.assertEquals(i, message.getObjectProperty(propKey));
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
- message.acknowledge();
- }
+ message.acknowledge();
+ }
- Assert.assertNull(consumer1.receiveImmediate());
+ Assert.assertNull(consumer1.receiveImmediate());
- session0.close();
+ session0.close();
- session1.close();
+ session1.close();
- sf0.close();
+ sf0.close();
- sf1.close();
+ sf1.close();
closeFields();
assertEquals(0, loadQueues(server0).size());

0 comments on commit d0c1b7a

Please sign in to comment.
Something went wrong with that request. Please try again.