diff --git a/src/main/org/hornetq/core/server/cluster/Bridge.java b/src/main/org/hornetq/core/server/cluster/Bridge.java index 6a62a3ef863..273c7ca5e6c 100644 --- a/src/main/org/hornetq/core/server/cluster/Bridge.java +++ b/src/main/org/hornetq/core/server/cluster/Bridge.java @@ -44,8 +44,6 @@ public interface Bridge extends Consumer, HornetQComponent void activate(); - void setQueue(Queue queue); - void setNotificationService(NotificationService notificationService); RemotingConnection getForwardingConnection(); diff --git a/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java b/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java index 994c4cfa286..fca2ab7eb7b 100644 --- a/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java +++ b/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java @@ -64,6 +64,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // Constants ----------------------------------------------------- private static final Logger log = Logger.getLogger(BridgeImpl.class); + + private static final boolean isTrace = log.isTraceEnabled(); // Attributes ---------------------------------------------------- @@ -77,7 +79,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final SimpleString name; - private Queue queue; + private final Queue queue; protected final Executor executor; @@ -222,6 +224,8 @@ public void stop() throws Exception } } + log.info("Bridge " + this.name + " being stopped"); + stopping = true; executor.execute(new StopRunnable()); @@ -266,11 +270,6 @@ public Queue getQueue() return queue; } - public void setQueue(final Queue queue) - { - this.queue = queue; - } - public Filter getFilter() { return filter; @@ -367,20 +366,25 @@ public HandleStatus handle(final MessageReference ref) throws Exception { return HandleStatus.NO_MATCH; } - + synchronized (this) { if (!active) { + log.debug(name + "::Ignoring reference on bridge as it is set to iniactive ref=" + ref); return HandleStatus.BUSY; } + if (isTrace) + { + log.trace("Bridge " + name + " is handling reference=" + ref); + } ref.handled(); ServerMessage message = ref.getMessage(); refs.add(ref); - + message = beforeForward(message); SimpleString dest; @@ -419,11 +423,13 @@ public HandleStatus handle(final MessageReference ref) throws Exception public void connectionFailed(final HornetQException me, boolean failedOver) { + log.warn(name + "::Connection failed with failedOver=" + failedOver, me); fail(false); } public void beforeReconnect(final HornetQException exception) { + log.warn(name + "::Connection failed before reconnect ", exception); fail(true); } @@ -454,8 +460,11 @@ private void fail(final boolean beforeReconnect) // we want to cancel all unacked refs so they get resent // duplicate detection will ensure no dups are routed on the other side + log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" + beforeReconnect); + if (session.getConnection().isDestroyed()) { + log.debug(name + "::Connection is destroyed, active = false now"); active = false; } @@ -467,7 +476,7 @@ private void fail(final boolean beforeReconnect) { synchronized (this) { - active = false; + log.debug(name + "::Connection is destroyed, active = false now"); } cancelRefs(); @@ -476,6 +485,7 @@ private void fail(final boolean beforeReconnect) { afterConnect(); + log.debug(name + "::After reconnect, setting active=true now"); active = true; if (queue != null) @@ -650,6 +660,8 @@ public void run() { return; } + + log.debug("Closing Session for bridge " + BridgeImpl.this.name); if (session != null) { diff --git a/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java b/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java index 840c384b266..acd675be387 100644 --- a/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java @@ -16,8 +16,12 @@ import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED; import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import org.hornetq.api.core.DiscoveryGroupConfiguration; @@ -29,7 +33,6 @@ import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.client.impl.ServerLocatorInternal; -import org.hornetq.core.client.impl.TopologyMember; import org.hornetq.core.logging.Logger; import org.hornetq.core.postoffice.Binding; import org.hornetq.core.postoffice.Bindings; @@ -272,6 +275,8 @@ public void stop() throws Exception { serverLocator.removeClusterTopologyListener(this); } + + log.debug("Cluster connection being stopped for node" + nodeUUID); synchronized (this) { @@ -357,6 +362,8 @@ public synchronized void activate() throws Exception serverLocator.setBackup(server.getConfiguration().isBackup()); serverLocator.setInitialConnectAttempts(-1); serverLocator.setConfirmationWindowSize(0); + serverLocator.setBlockOnDurableSend(false); + serverLocator.setBlockOnNonDurableSend(false); if(retryInterval > 0) { @@ -388,6 +395,7 @@ public TransportConfiguration getConnector() public synchronized void nodeDown(final String nodeID) { + log.debug("node " + nodeID + " being considered down on cluster connection for nodeID=" + nodeUUID); if (nodeID.equals(nodeUUID.toString())) { return; diff --git a/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java b/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java index 4fdc721ea62..124319a548d 100644 --- a/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java +++ b/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java @@ -22,7 +22,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; - import org.hornetq.api.core.DiscoveryGroupConfiguration; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; @@ -93,7 +92,7 @@ public class ClusterManagerImpl implements ClusterManager // regular client listeners to be notified of cluster topology changes. // they correspond to regular clients using a HA ServerLocator private Set clientListeners = new ConcurrentHashSet(); - + // cluster connections listeners to be notified of cluster topology changes // they correspond to cluster connections on *other nodes connected to this one* private Set clusterConnectionListeners = new ConcurrentHashSet(); @@ -103,6 +102,7 @@ public class ClusterManagerImpl implements ClusterManager private volatile ServerLocatorInternal backupServerLocator; private final List clusterLocators = new ArrayList(); + private Executor executor; public ClusterManagerImpl(final ExecutorFactory executorFactory, @@ -173,11 +173,10 @@ public synchronized void start() throws Exception { announceNode(); } - + started = true; } - public synchronized void stop() throws Exception { if (!started) @@ -200,7 +199,7 @@ public synchronized void stop() throws Exception clusterConnection.stop(); managementService.unregisterCluster(clusterConnection.getName().toString()); } - + clusterConnectionListeners.clear(); clientListeners.clear(); clusterConnections.clear(); @@ -216,7 +215,7 @@ public synchronized void stop() throws Exception bridges.clear(); - if(backupServerLocator != null) + if (backupServerLocator != null) { backupServerLocator.close(); backupServerLocator = null; @@ -238,7 +237,7 @@ public void notifyNodeDown(String nodeID) } boolean removed = topology.removeMember(nodeID); - + if (removed) { @@ -262,23 +261,23 @@ public void notifyNodeUp(final String nodeID, TopologyMember member = new TopologyMember(connectorPair); boolean updated = topology.addMember(nodeID, member); - if(!updated) + if (!updated) { return; } - + for (ClusterTopologyListener listener : clientListeners) { listener.nodeUP(nodeID, member.getConnector(), last); } - for (ClusterTopologyListener listener : clusterConnectionListeners) { listener.nodeUP(nodeID, member.getConnector(), last); } - //if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster connections. + // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster + // connections. if (nodeAnnounce) { for (ClusterConnection clusterConnection : clusterConnections.values()) @@ -287,7 +286,7 @@ public void notifyNodeUp(final String nodeID, } } } - + public boolean isStarted() { return started; @@ -313,8 +312,7 @@ public ClusterConnection getClusterConnection(final SimpleString name) return clusterConnections.get(name.toString()); } - public void addClusterTopologyListener(final ClusterTopologyListener listener, - final boolean clusterConnection) + public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection) { synchronized (this) { @@ -333,7 +331,7 @@ public void addClusterTopologyListener(final ClusterTopologyListener listener, } public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener, - final boolean clusterConnection) + final boolean clusterConnection) { if (clusterConnection) { @@ -349,9 +347,9 @@ public Topology getTopology() { return topology; } - + // backup node becomes live - public synchronized void activate() + public synchronized void activate() { if (backup) { @@ -360,7 +358,7 @@ public synchronized void activate() String nodeID = server.getNodeID().toString(); TopologyMember member = topology.getMember(nodeID); - //we swap the topology backup now = live + // we swap the topology backup now = live if (member != null) { member.getConnector().a = member.getConnector().b; @@ -368,9 +366,9 @@ public synchronized void activate() member.getConnector().b = null; } - if(backupServerLocator != null) + if (backupServerLocator != null) { - //todo we could use the topology of this to preempt it arriving from the cc + // todo we could use the topology of this to preempt it arriving from the cc try { backupServerLocator.close(); @@ -434,7 +432,7 @@ public synchronized void activate() public void announceBackup() throws Exception { List configs = this.configuration.getClusterConfigurations(); - if(!configs.isEmpty()) + if (!configs.isEmpty()) { ClusterConnectionConfiguration config = configs.get(0); @@ -442,8 +440,7 @@ public void announceBackup() throws Exception if (connector == null) { - log.warn("No connecor with name '" + config.getConnectorName() + - "'. backup cannot be announced."); + log.warn("No connecor with name '" + config.getConnectorName() + "'. backup cannot be announced."); return; } announceBackup(config, connector); @@ -469,11 +466,13 @@ private synchronized void announceNode() { if (backup) { - member = new TopologyMember(new Pair(null, cc.getConnector())); + member = new TopologyMember(new Pair(null, + cc.getConnector())); } else { - member = new TopologyMember(new Pair(cc.getConnector(), null)); + member = new TopologyMember(new Pair(cc.getConnector(), + null)); } topology.addMember(nodeID, member); @@ -482,11 +481,11 @@ private synchronized void announceNode() { if (backup) { - // pair.b = cc.getConnector(); + // pair.b = cc.getConnector(); } else { - // pair.a = cc.getConnector(); + // pair.a = cc.getConnector(); } } @@ -496,7 +495,7 @@ private synchronized void announceNode() { listener.nodeUP(nodeID, member.getConnector(), false); } - + for (ClusterTopologyListener listener : clusterConnectionListeners) { listener.nodeUP(nodeID, member.getConnector(), false); @@ -685,6 +684,8 @@ public synchronized void deployBridge(final BridgeConfiguration config) throws E serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier()); serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); serverLocator.setInitialConnectAttempts(config.getReconnectAttempts()); + serverLocator.setBlockOnDurableSend(false); + serverLocator.setBlockOnNonDurableSend(false); clusterLocators.add(serverLocator); Bridge bridge = new BridgeImpl(serverLocator, nodeUUID, @@ -720,7 +721,7 @@ public synchronized void destroyBridge(final String name) throws Exception managementService.unregisterBridge(name); } } - + private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception { if (config.getName() == null) @@ -746,10 +747,10 @@ private synchronized void deployClusterConnection(final ClusterConnectionConfigu return; } - - if(clusterConnections.containsKey(config.getName())) + if (clusterConnections.containsKey(config.getName())) { - log.warn("Cluster Configuration '" + config.getConnectorName() + "' already exists. The cluster connection will not be deployed.", new Exception ("trace")); + log.warn("Cluster Configuration '" + config.getConnectorName() + + "' already exists. The cluster connection will not be deployed.", new Exception("trace")); return; } @@ -788,7 +789,8 @@ private synchronized void deployClusterConnection(final ClusterConnectionConfigu } else { - TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null; + TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors()) + : null; clusterConnection = new ClusterConnectionImpl(tcConfigs, connector, @@ -816,8 +818,8 @@ private synchronized void deployClusterConnection(final ClusterConnectionConfigu clusterConnections.put(config.getName(), clusterConnection); clusterConnection.start(); - - if(backup) + + if (backup) { announceBackup(config, connector); } @@ -860,13 +862,15 @@ public void run() ClientSessionFactory backupSessionFactory = backupServerLocator.connect(); if (backupSessionFactory != null) { - backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector)); + backupSessionFactory.getConnection() + .getChannel(0, -1) + .send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector)); log.info("backup announced"); } } catch (Exception e) { - log.warn("Unable to announce backup", e); + log.warn("Unable to announce backup", e); } } }); @@ -892,7 +896,8 @@ private Transformer instantiateTransformer(final String transformerClassName) } return transformer; } - //for testing + + // for testing public void clear() { bridges.clear(); @@ -904,7 +909,7 @@ public void clear() } catch (Exception e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } clusterConnections.clear(); diff --git a/src/main/org/hornetq/core/server/impl/QueueImpl.java b/src/main/org/hornetq/core/server/impl/QueueImpl.java index 08c523cc36d..5f3611a1ef2 100644 --- a/src/main/org/hornetq/core/server/impl/QueueImpl.java +++ b/src/main/org/hornetq/core/server/impl/QueueImpl.java @@ -393,7 +393,7 @@ public void addTail(final MessageReference ref, final boolean direct) { return; } - + queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate()); concurrentQueue.add(ref); @@ -2055,7 +2055,7 @@ private void postAcknowledge(final MessageReference ref) } catch (Exception e) { - QueueImpl.log.warn("Unable to decrement reference counting", e); + QueueImpl.log.warn("Unable to decrement reference counting", e); } } diff --git a/src/main/org/hornetq/utils/PriorityLinkedListImpl.java b/src/main/org/hornetq/utils/PriorityLinkedListImpl.java index 8b5fb1f8cef..6a80afee50b 100644 --- a/src/main/org/hornetq/utils/PriorityLinkedListImpl.java +++ b/src/main/org/hornetq/utils/PriorityLinkedListImpl.java @@ -43,6 +43,8 @@ public class PriorityLinkedListImpl implements PriorityLinkedList private int highestPriority = -1; + private int lastPriority = -1; + public PriorityLinkedListImpl(final int priorities) { this.priorities = priorities; @@ -54,14 +56,25 @@ public PriorityLinkedListImpl(final int priorities) levels[i] = new LinkedListImpl(); } } - - private void checkHighest(int priority) + + private void checkHighest(final int priority) { + if (lastPriority != priority || priority > highestPriority) + { + lastPriority = priority; + if (lastReset == Integer.MAX_VALUE) + { + lastReset = 0; + } + else + { + lastReset++; + } + } + if (priority > highestPriority) { highestPriority = priority; - - lastReset++; } } @@ -150,19 +163,20 @@ private class PriorityLinkedListIterator implements LinkedListIterator { private int index; - private LinkedListIterator[] cachedIters = new LinkedListIterator[levels.length]; + private final LinkedListIterator[] cachedIters = new LinkedListIterator[levels.length]; private LinkedListIterator lastIter; - + private int resetCount = lastReset; - + volatile boolean closed = false; PriorityLinkedListIterator() { index = levels.length - 1; } - + + @Override protected void finalize() { close(); @@ -184,7 +198,7 @@ public void close() { closed = true; lastIter = null; - + for (LinkedListIterator iter : cachedIters) { if (iter != null) @@ -194,13 +208,13 @@ public void close() } } } - + private void checkReset() { - if (lastReset > resetCount) + if (lastReset != resetCount) { index = highestPriority; - + resetCount = lastReset; } } @@ -208,7 +222,7 @@ private void checkReset() public boolean hasNext() { checkReset(); - + while (index >= 0) { lastIter = cachedIters[index]; @@ -255,10 +269,17 @@ public void remove() } lastIter.remove(); - - if (index == highestPriority && levels[index].size() == 0) + + // This next statement would be the equivalent of: + // if (index == highestPriority && levels[index].size() == 0) + // However we have to keep checking all the previous levels + // otherwise we would cache a max that will not exist + // what would make us eventually having hasNext() returning false + // as a bug + // Part of the fix for HORNETQ-705 + for (int i = index; i >= 0 && levels[index].size() == 0; i--) { - highestPriority--; + highestPriority = i; } size--; diff --git a/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java b/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java index b60306fcc98..e9e3bf32f96 100644 --- a/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java @@ -17,6 +17,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -42,6 +45,7 @@ import org.hornetq.core.server.Queue; import org.hornetq.core.server.cluster.impl.BridgeImpl; import org.hornetq.core.transaction.impl.TransactionImpl; +import org.hornetq.tests.util.RandomUtil; import org.hornetq.tests.util.ServiceTestBase; import org.hornetq.tests.util.UnitTestCase; import org.hornetq.utils.LinkedListIterator; @@ -947,20 +951,252 @@ public void internaltestWithTransformer(final boolean useFiles) throws Exception try { - server0.stop(); + server0.stop(); + } + catch (Exception ignored) + { + + } + + try + { + server1.stop(); + } + catch (Exception ignored) + { + + } + } + + } + + public void testSawtoothLoad() throws Exception + { + Map server0Params = new HashMap(); + HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); + server0.getConfiguration().setThreadPoolMaxSize(10); + + Map server1Params = new HashMap(); + addTargetParameters(server1Params); + HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + server1.getConfiguration().setThreadPoolMaxSize(10); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + Map connectors = new HashMap(); + final TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + final TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + connectors.put(server1tc.getName(), server1tc); + + server0.getConfiguration().setConnectorConfigurations(connectors); + + ArrayList staticConnectors = new ArrayList(); + staticConnectors.add(server1tc.getName()); + + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", + queueName0, + forwardAddress, + null, + null, + 1000, + 1d, + -1, + false, + 0, + HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, + staticConnectors, + false, + ConfigurationImpl.DEFAULT_CLUSTER_USER, + ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD); + + List bridgeConfigs = new ArrayList(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true); + List queueConfigs0 = new ArrayList(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true); + List queueConfigs1 = new ArrayList(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + try + { + server1.start(); + server0.start(); + + final int numMessages = 10000; + + final int totalrepeats = 10; + + final AtomicInteger errors = new AtomicInteger(0); + + // We shouldn't have more than 10K messages pending + final Semaphore semop = new Semaphore(10000); + + class ConsumerThread extends Thread + { + public void run() + { + try + { + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server1tc); + + ClientSessionFactory sf = locator.createSessionFactory(); + + ClientSession session = sf.createSession(false, false); + + session.start(); + + ClientConsumer consumer = session.createConsumer(queueName1); + + for (int i = 0; i < numMessages; i++) + { + ClientMessage message = consumer.receive(5000); + + Assert.assertNotNull(message); + + message.acknowledge(); + semop.release(); + if (i % 1000 == 0) + { + session.commit(); + } + } + + session.commit(); + + session.close(); + sf.close(); + locator.close(); + + } + catch (Throwable e) + { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + + class ProducerThread extends Thread + { + final int nmsg; + ProducerThread(int nmsg) + { + this.nmsg = nmsg; + } + public void run() + { + ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(server0tc); + + locator.setBlockOnDurableSend(false); + locator.setBlockOnNonDurableSend(false); + + ClientSessionFactory sf = null; + + ClientSession session = null; + + ClientProducer producer = null; + + try + { + sf = locator.createSessionFactory(); + + session = sf.createSession(false, true, true); + + producer = session.createProducer(new SimpleString(testAddress)); + + for (int i = 0; i < nmsg; i++) + { + assertEquals(0, errors.get()); + ClientMessage message = session.createMessage(true); + + message.putIntProperty("seq", i); + + + if (i % 100 == 0) + { + message.setPriority((byte)(RandomUtil.randomPositiveInt() % 9)); + } + else + { + message.setPriority((byte)5); + } + + message.getBodyBuffer().writeBytes(new byte[50]); + + producer.send(message); + assertTrue(semop.tryAcquire(1, 10, TimeUnit.SECONDS)); + } + } + catch (Throwable e) + { + e.printStackTrace(System.out); + errors.incrementAndGet(); + } + finally + { + try + { + session.close(); + sf.close(); + locator.close(); + } + catch (Exception ignored) + { + errors.incrementAndGet(); + } + } + } } - catch(Exception ignored) + + for (int repeat = 0 ; repeat < totalrepeats; repeat++) { - + System.out.println("Repeat " + repeat); + ArrayList threads = new ArrayList(); + + threads.add(new ConsumerThread()); + threads.add(new ProducerThread(numMessages / 2)); + threads.add(new ProducerThread(numMessages / 2)); + + for (Thread t : threads) + { + t.start(); + } + + for (Thread t : threads) + { + t.join(); + } + + assertEquals(0, errors.get()); + } + } + finally + { + try + { + server0.stop(); + } + catch (Exception ignored) + { + } try { - server1.stop(); + server1.stop(); } - catch(Exception ignored) + catch (Exception ignored) { - + } } @@ -1142,11 +1378,11 @@ public void testNullForwardingAddress() throws Exception ArrayList staticConnectors = new ArrayList(); staticConnectors.add(server1tc.getName()); BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null - // forwarding - // address to - // use messages' - // original - // address + // forwarding + // address to + // use messages' + // original + // address null, null, 1000, diff --git a/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java index fb1e6dd9128..f1d507b25da 100644 --- a/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java @@ -513,7 +513,7 @@ protected void sendInRange(final int node, throw new IllegalArgumentException("No sf at " + node); } - ClientSession session = sf.createSession(false, true, true); + ClientSession session = sf.createSession(false, false, false); try { @@ -531,7 +531,14 @@ protected void sendInRange(final int node, message.putIntProperty(ClusterTestBase.COUNT_PROP, i); producer.send(message); + + if (i % 100 == 0) + { + session.commit(); + } } + + session.commit(); } finally { @@ -1328,6 +1335,7 @@ protected void setupLiveServer(final int node, configuration.setJournalFileSize(100 * 1024); configuration.setJournalType(getDefaultJournalType()); configuration.setSharedStore(sharedStorage); + configuration.setThreadPoolMaxSize(10); if (sharedStorage) { // Shared storage will share the node between the backup and live node diff --git a/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java b/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java index ab1bc9ee420..e39bf3b253e 100644 --- a/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java +++ b/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java @@ -210,6 +210,56 @@ public void testBasicRoundRobin() throws Exception verifyNotReceive(0, 1, 2, 3, 4); } + + public void testBasicRoundRobinManyMessages() throws Exception + { + setupCluster(); + + startServers(); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + setupSessionFactory(3, isNetty()); + setupSessionFactory(4, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + createQueue(2, "queues.testaddress", "queue0", null, false); + createQueue(3, "queues.testaddress", "queue0", null, false); + createQueue(4, "queues.testaddress", "queue0", null, false); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + addConsumer(2, 2, "queue0", null); + addConsumer(3, 3, "queue0", null); + addConsumer(4, 4, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(2, "queues.testaddress", 1, 1, true); + waitForBindings(3, "queues.testaddress", 1, 1, true); + waitForBindings(4, "queues.testaddress", 1, 1, true); + + System.out.println(clusterDescription(servers[0])); + System.out.println(clusterDescription(servers[1])); + System.out.println(clusterDescription(servers[2])); + System.out.println(clusterDescription(servers[3])); + System.out.println(clusterDescription(servers[4])); + + waitForBindings(0, "queues.testaddress", 4, 4, false); + waitForBindings(1, "queues.testaddress", 4, 4, false); + waitForBindings(2, "queues.testaddress", 4, 4, false); + waitForBindings(3, "queues.testaddress", 4, 4, false); + waitForBindings(4, "queues.testaddress", 4, 4, false); + + send(0, "queues.testaddress", 1000, true, null); + + verifyReceiveRoundRobinInSomeOrder(1000, 0, 1, 2, 3, 4); + + verifyNotReceive(0, 1, 2, 3, 4); + } + public void testRoundRobinMultipleQueues() throws Exception { SymmetricClusterTest.log.info("starting"); diff --git a/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java b/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java index bbac3214ef5..f9be2576f53 100644 --- a/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java +++ b/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java @@ -16,6 +16,7 @@ import junit.framework.Assert; import junit.framework.TestCase; +import org.hornetq.tests.util.RandomUtil; import org.hornetq.utils.LinkedListIterator; import org.hornetq.utils.PriorityLinkedListImpl; @@ -77,9 +78,9 @@ public abstract class PriorityLinkedListTestBase extends TestCase protected Wibble y; protected Wibble z; - + private PriorityLinkedListImpl list; - + protected abstract PriorityLinkedListImpl getList(); public void setUp() throws Exception @@ -133,7 +134,7 @@ public void testEmpty() throws Exception Wibble w = list.poll(); Assert.assertEquals(a, w); Assert.assertTrue(list.isEmpty()); - + assertEquals(0, list.size()); } @@ -144,7 +145,7 @@ public void testaddHead() throws Exception list.addHead(c, 0); list.addHead(d, 0); list.addHead(e, 0); - + assertEquals(5, list.size()); Assert.assertEquals(e, list.poll()); @@ -153,7 +154,7 @@ public void testaddHead() throws Exception Assert.assertEquals(b, list.poll()); Assert.assertEquals(a, list.poll()); Assert.assertNull(list.poll()); - + assertEquals(0, list.size()); } @@ -172,11 +173,11 @@ public void testaddTail() throws Exception Assert.assertEquals(d, list.poll()); Assert.assertEquals(e, list.poll()); Assert.assertNull(list.poll()); - + assertEquals(0, list.size()); } - + public void testAddLastAndFirst() throws Exception { list.addTail(a, 0); @@ -189,7 +190,7 @@ public void testAddLastAndFirst() throws Exception list.addTail(h, 0); list.addTail(i, 0); list.addTail(j, 0); - + list.addHead(k, 0); list.addHead(l, 0); list.addHead(m, 0); @@ -200,7 +201,7 @@ public void testAddLastAndFirst() throws Exception list.addHead(r, 0); list.addHead(s, 0); list.addHead(t, 0); - + assertEquals(t, list.poll()); assertEquals(s, list.poll()); assertEquals(r, list.poll()); @@ -211,7 +212,7 @@ public void testAddLastAndFirst() throws Exception assertEquals(m, list.poll()); assertEquals(l, list.poll()); assertEquals(k, list.poll()); - + assertEquals(a, list.poll()); assertEquals(b, list.poll()); assertEquals(c, list.poll()); @@ -223,7 +224,7 @@ public void testAddLastAndFirst() throws Exception assertEquals(i, list.poll()); assertEquals(j, list.poll()); } - + public void testAddLastAndFirstWithIterator() throws Exception { list.addTail(a, 0); @@ -236,7 +237,7 @@ public void testAddLastAndFirstWithIterator() throws Exception list.addTail(h, 0); list.addTail(i, 0); list.addTail(j, 0); - + list.addHead(k, 0); list.addHead(l, 0); list.addHead(m, 0); @@ -247,9 +248,9 @@ public void testAddLastAndFirstWithIterator() throws Exception list.addHead(r, 0); list.addHead(s, 0); list.addHead(t, 0); - + LinkedListIterator iter = list.iterator(); - + assertTrue(iter.hasNext()); assertEquals(t, iter.next()); assertTrue(iter.hasNext()); @@ -270,7 +271,7 @@ public void testAddLastAndFirstWithIterator() throws Exception assertEquals(l, iter.next()); assertTrue(iter.hasNext()); assertEquals(k, iter.next()); - + assertTrue(iter.hasNext()); assertEquals(a, iter.next()); assertTrue(iter.hasNext()); @@ -438,7 +439,7 @@ public void testPoll() throws Exception Assert.assertEquals(a, list.poll()); Assert.assertNull(list.poll()); - + assertEquals(0, list.size()); } @@ -567,8 +568,7 @@ public void testIterator() w = (Wibble)iter.next(); Assert.assertEquals("z", w.s); assertFalse(iter.hasNext()); - - + iter = list.iterator(); assertTrue(iter.hasNext()); w = (Wibble)iter.next(); @@ -617,7 +617,7 @@ public void testIterator() iter.remove(); Assert.assertEquals(23, list.size()); - + assertTrue(iter.hasNext()); w = (Wibble)iter.next(); Assert.assertEquals("i", w.s); @@ -671,7 +671,6 @@ public void testIterator() Assert.assertEquals("z", w.s); iter.remove(); - iter = list.iterator(); assertTrue(iter.hasNext()); w = (Wibble)iter.next(); @@ -739,78 +738,76 @@ public void testIterator() assertTrue(iter.hasNext()); w = (Wibble)iter.next(); Assert.assertEquals("y", w.s); - + assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); - //Test the elements added after iter created are seen - + // Test the elements added after iter created are seen + list.addTail(a, 4); list.addTail(b, 4); - + assertTrue(iter.hasNext()); w = (Wibble)iter.next(); Assert.assertEquals("a", w.s); - + assertTrue(iter.hasNext()); w = (Wibble)iter.next(); Assert.assertEquals("b", w.s); - - assertFalse(iter.hasNext()); - + + assertFalse(iter.hasNext()); + list.addTail(c, 4); list.addTail(d, 4); - + assertTrue(iter.hasNext()); w = (Wibble)iter.next(); Assert.assertEquals("c", w.s); - + assertTrue(iter.hasNext()); w = (Wibble)iter.next(); Assert.assertEquals("d", w.s); - + assertFalse(iter.hasNext()); - } - + public void testIteratorPicksUpHigherPriorities() { list.addTail(a, 4); list.addTail(b, 4); list.addTail(c, 4); - + LinkedListIterator iter = list.iterator(); - + assertTrue(iter.hasNext()); assertEquals(a, iter.next()); - + assertTrue(iter.hasNext()); assertEquals(b, iter.next()); - + list.addTail(d, 5); list.addTail(e, 5); - + assertTrue(iter.hasNext()); assertEquals(d, iter.next()); - + assertTrue(iter.hasNext()); assertEquals(e, iter.next()); - + assertTrue(iter.hasNext()); assertEquals(c, iter.next()); - + list.addTail(f, 1); list.addTail(g, 9); - + assertTrue(iter.hasNext()); assertEquals(g, iter.next()); - + assertTrue(iter.hasNext()); assertEquals(f, iter.next()); } - public void testClear() { list.addTail(a, 0); @@ -829,6 +826,59 @@ public void testClear() Assert.assertNull(list.poll()); } + public void testMixupIterator() + { + list.addTail(c, 5); + list.addTail(a, 4); + list.addTail(b, 4); + + LinkedListIterator iter = list.iterator(); + + assertTrue(iter.hasNext()); + assertEquals(c, iter.next()); + assertTrue(iter.hasNext()); + assertEquals(a, iter.next()); + assertTrue(iter.hasNext()); + assertEquals(b, iter.next()); + list.addTail(d, 5); + assertTrue(iter.hasNext()); + assertEquals(d, iter.next()); + } + + public void testMixupIterator2() + { + list.addTail(c, 5); + + list.addTail(k, 0); + + list.addTail(a, 2); + list.addTail(b, 2); + + LinkedListIterator iter = list.iterator(); + + assertTrue(iter.hasNext()); + assertEquals(c, iter.next()); + iter.remove(); + + assertTrue(iter.hasNext()); + assertEquals(a, iter.next()); + iter.remove(); + + assertTrue(iter.hasNext()); + assertEquals(b, iter.next()); + iter.remove(); + + assertTrue(iter.hasNext()); + assertEquals(k, iter.next()); + iter.remove(); + + list.addTail(d, 2); + + assertTrue(iter.hasNext()); + assertEquals(d, iter.next()); + iter.remove(); + } + class Wibble { String s;