Skip to content

Commit

Permalink
This closes #2417
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Nov 13, 2018
2 parents 699591e + f4396da commit ff2073a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 4 deletions.
Expand Up @@ -1239,7 +1239,7 @@ private synchronized void doBindingAdded(final ClientMessage message) throws Exc
return;
}

RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1);
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1, messageLoadBalancingType);

if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
Expand Down
Expand Up @@ -62,6 +62,8 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {

private final int distance;

private final MessageLoadBalancingType messageLoadBalancingType;

private boolean connected = true;

public RemoteQueueBindingImpl(final long id,
Expand All @@ -72,7 +74,8 @@ public RemoteQueueBindingImpl(final long id,
final SimpleString filterString,
final Queue storeAndForwardQueue,
final SimpleString bridgeName,
final int distance) throws Exception {
final int distance,
final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
this.id = id;

this.address = address;
Expand All @@ -90,6 +93,8 @@ public RemoteQueueBindingImpl(final long id,
idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName);

this.distance = distance;

this.messageLoadBalancingType = messageLoadBalancingType;
}

@Override
Expand Down Expand Up @@ -149,7 +154,7 @@ public int getDistance() {

@Override
public synchronized boolean isHighAcceptPriority(final Message message) {
if (consumerCount == 0) {
if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
}

Expand Down
Expand Up @@ -16,7 +16,16 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -60,6 +69,37 @@ public void testStackOverflow() throws Exception {
send(1, "queues.testaddress", 1, false, null);
}

@Test
public void testStackOverflowJMS() throws Exception {
final String QUEUE_NAME = "queues.queue0";

setupCluster();

startServers();

ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://0");
Connection c1 = cf1.createConnection();
c1.start();
Session s1 = c1.createSession();
MessageConsumer mc1 = s1.createConsumer(s1.createQueue(QUEUE_NAME));

waitForBindings(0, QUEUE_NAME, 1, 1, true);
waitForBindings(1, QUEUE_NAME, 1, 1, false);

ConnectionFactory cf2 = new ActiveMQConnectionFactory("vm://1");
Connection c2 = cf2.createConnection();
Session s2 = c2.createSession();
MessageProducer mp2 = s2.createProducer(s2.createQueue(QUEUE_NAME));
mp2.send(s2.createMessage());

waitForBindings(1, QUEUE_NAME, 1, 0, true);

assertTrue(Wait.waitFor(() -> servers[1].locateQueue(SimpleString.toSimpleString(QUEUE_NAME)).getMessageCount() == 1, 2000, 100));

c1.close();
c2.close();
}

protected void setupCluster() throws Exception {
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.OFF, 1, isNetty(), 0, 1);

Expand Down
Expand Up @@ -18,6 +18,7 @@

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
Expand Down Expand Up @@ -48,7 +49,7 @@ public void testAddRemoveConsumer() throws Exception {
final Queue storeAndForwardQueue = new FakeQueue(null);
final SimpleString bridgeName = RandomUtil.randomSimpleString();
final int distance = 0;
RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance);
RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.ON_DEMAND);

for (int i = 0; i < 100; i++) {
binding.addConsumer(new SimpleString("B" + i + "<A"));
Expand Down

0 comments on commit ff2073a

Please sign in to comment.