Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public interface ActiveMQRALogger extends BasicLogger {
@Message(id = 151005, value = "awaiting server availability", format = Message.Format.MESSAGE_FORMAT)
void awaitingJMSServerCreation();

@LogMessage(level = Logger.Level.INFO)
@Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT)
void rebalancingConnections();

@LogMessage(level = Logger.Level.WARN)
@Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT)
void problemResettingXASession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import javax.transaction.xa.XAResource;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand All @@ -42,6 +44,8 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
Expand Down Expand Up @@ -111,8 +115,14 @@ public class ActiveMQActivation {

private ActiveMQConnectionFactory factory;

private List<String> nodes = Collections.synchronizedList(new ArrayList<String>());

private Map<String, Long> removedNodes = new ConcurrentHashMap<String, Long>();

private boolean lastReceived = false;

// Whether we are in the failure recovery loop
private final AtomicBoolean inFailure = new AtomicBoolean(false);
private final AtomicBoolean inReconnect = new AtomicBoolean(false);
private XARecoveryConfig resourceRecovery;

static {
Expand Down Expand Up @@ -338,6 +348,9 @@ protected synchronized void setup() throws Exception {
Map<String, String> recoveryConfProps = new HashMap<String, String>();
recoveryConfProps.put(XARecoveryConfig.JNDI_NAME_PROPERTY_KEY, ra.getJndiName());
resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword(), recoveryConfProps);
if (spec.isRebalanceConnections()) {
factory.getServerLocator().addClusterTopologyListener(new RebalancingListener());
}

ActiveMQRALogger.LOGGER.debug("Setup complete " + this);
}
Expand Down Expand Up @@ -431,6 +444,9 @@ public void run() {
factory = null;
}

nodes.clear();
lastReceived = false;

ActiveMQRALogger.LOGGER.debug("Tearing down complete " + this);
}

Expand Down Expand Up @@ -610,27 +626,34 @@ public String toString() {
return buffer.toString();
}

public void rebalance() {
ActiveMQRALogger.LOGGER.rebalancingConnections();
reconnect(null);
}

/**
* Handles any failure by trying to reconnect
* Drops all existing connection-related resources and reconnects
*
* @param failure the reason for the failure
* @param failure if reconnecting in the event of a failure
*/
public void handleFailure(Throwable failure) {
if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
}
else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) {
ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
}
else {
ActiveMQRALogger.LOGGER.failureInActivation(failure, spec);
public void reconnect(Throwable failure) {
if (failure != null) {
if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
}
else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) {
ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
}
else {
ActiveMQRALogger.LOGGER.failureInActivation(failure, spec);
}
}
int reconnectCount = 0;
int setupAttempts = spec.getSetupAttempts();
long setupInterval = spec.getSetupInterval();

// Only enter the failure loop once
if (inFailure.getAndSet(true))
// Only enter the reconnect loop once
if (inReconnect.getAndSet(true))
return;
try {
Throwable lastException = failure;
Expand Down Expand Up @@ -675,7 +698,7 @@ else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).g
}
finally {
// Leaving failure recovery loop
inFailure.set(false);
inReconnect.set(false);
}
}

Expand All @@ -693,11 +716,55 @@ public void run() {
setup();
}
catch (Throwable t) {
handleFailure(t);
reconnect(t);
}
}

public void release() {
}
}

private class RebalancingListener implements ClusterTopologyListener {
@Override
public void nodeUP(TopologyMember member, boolean last) {
boolean newNode = false;

String id = member.getNodeId();
if (!nodes.contains(id)) {
if (removedNodes.get(id) == null || (removedNodes.get(id) != null && removedNodes.get(id) < member.getUniqueEventID())) {
nodes.add(id);
newNode = true;
}
}

if (lastReceived && newNode) {
Runnable runnable = new Runnable() {
@Override
public void run() {
rebalance();
}
};
Thread t = new Thread(runnable, "NodeUP Connection Rebalancer");
t.start();
}
else if (last) {
lastReceived = true;
}
}

@Override
public void nodeDown(long eventUID, String nodeID) {
if (nodes.remove(nodeID)) {
removedNodes.put(nodeID, eventUID);
Runnable runnable = new Runnable() {
@Override
public void run() {
rebalance();
}
};
Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer");
t.start();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
// undefined by default, default is specified at the RA level in ActiveMQRAProperties
private Long setupInterval;

private Boolean rebalanceConnections = false;

/**
* Constructor
*/
Expand Down Expand Up @@ -626,6 +628,14 @@ public void setUseLocalTx(final Boolean localTx) {
this.localTx = localTx;
}

public boolean isRebalanceConnections() {
return rebalanceConnections;
}

public void setRebalanceConnections(boolean rebalanceConnections) {
this.rebalanceConnections = rebalanceConnections;
}

public int getSetupAttempts() {
if (ActiveMQActivationSpec.trace) {
ActiveMQRALogger.LOGGER.trace("getSetupAttempts()");
Expand Down Expand Up @@ -846,6 +856,7 @@ public boolean equals(Object o) {
if (parsedJndiParams != null ? !parsedJndiParams.equals(that.parsedJndiParams) : that.parsedJndiParams != null)
return false;
if (localTx != null ? !localTx.equals(that.localTx) : that.localTx != null) return false;
if (rebalanceConnections != null ? !rebalanceConnections.equals(that.rebalanceConnections) : that.rebalanceConnections != null) return false;
if (setupAttempts != null ? !setupAttempts.equals(that.setupAttempts) : that.setupAttempts != null) return false;
return !(setupInterval != null ? !setupInterval.equals(that.setupInterval) : that.setupInterval != null);

Expand Down Expand Up @@ -873,6 +884,7 @@ public int hashCode() {
result = 31 * result + (jndiParams != null ? jndiParams.hashCode() : 0);
result = 31 * result + (parsedJndiParams != null ? parsedJndiParams.hashCode() : 0);
result = 31 * result + (localTx != null ? localTx.hashCode() : 0);
result = 31 * result + (rebalanceConnections != null ? rebalanceConnections.hashCode() : 0);
result = 31 * result + (setupAttempts != null ? setupAttempts.hashCode() : 0);
result = 31 * result + (setupInterval != null ? setupInterval.hashCode() : 0);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,10 @@ protected void clearDataRecreateServerDirs(final String testDir1, int index, boo
deleteDirectory(file);
file.mkdirs();

recreateDataDirectories(testDir1, index, backup);
}

protected void recreateDataDirectories(String testDir1, int index, boolean backup) {
recreateDirectory(getJournalDir(testDir1, index, backup));
recreateDirectory(getBindingsDir(testDir1, index, backup));
recreateDirectory(getPageDir(testDir1, index, backup));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,76 @@ public void testOutboundLoadBalancing() throws Exception {
}
}
}

@Test
public void testRebalance() throws Exception {
final int CONSUMER_COUNT = 10;
secondaryJmsServer.createQueue(true, MDBQUEUE, null, true, "/jms/" + MDBQUEUE);

ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setRebalanceConnections(true);
spec.setMaxSession(CONSUMER_COUNT);
spec.setSetupAttempts(5);
spec.setSetupInterval(200);
spec.setHA(true); // if this isn't true then the toplogy listener won't get nodeDown notifications
spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);

Queue primaryQueue = server.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
Queue secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);

assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);

ClientSession session = addClientSession(locator.createSessionFactory().createSession());
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("test");
clientProducer.send(message);

latch.await(5, TimeUnit.SECONDS);

assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");

for (int i = 0; i < 10; i++) {
secondaryServer.stop();

long mark = System.currentTimeMillis();
long timeout = 5000;
while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) {
Thread.sleep(100);
}

assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);

secondaryServer.start();
waitForServerToStart(secondaryServer);
secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);

mark = System.currentTimeMillis();
while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) {
Thread.sleep(100);
}

assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
}

qResourceAdapter.endpointDeactivation(endpointFactory, spec);
qResourceAdapter.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,49 @@ public void setUp() throws Exception {
params.put(TransportConstants.SERVER_ID_PROP_NAME, "1");
secondaryConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);

secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true, true), mbeanServer, usePersistence()));
secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true), mbeanServer, usePersistence()));
addServer(secondaryServer);
secondaryJmsServer = new JMSServerManagerImpl(secondaryServer);
secondaryJmsServer.start();
waitForTopology(secondaryServer, 2);

}

protected Configuration createDefaultConfig(boolean netty) throws Exception {
return createSecondaryDefaultConfig(netty, false);
return createSecondaryDefaultConfig(false);
}

protected Configuration createSecondaryDefaultConfig(boolean netty, boolean secondary) throws Exception {
protected Configuration createSecondaryDefaultConfig(boolean secondary) throws Exception {
HashMap invmMap = new HashMap();
HashMap nettyMap = new HashMap();
String primaryConnectorName = "invm2";
String secondaryConnectorName = "invm";
String directoryPrefix = "first";
int index = 0;

if (secondary) {
invmMap.put(TransportConstants.SERVER_ID_PROP_NAME, "1");
nettyMap.put("port", "5545");
primaryConnectorName = "invm";
secondaryConnectorName = "invm2";
directoryPrefix = "second";
index = 1;
}

ConfigurationImpl configuration = createBasicConfig().setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).setJournalDirectory(getTestDir() + "/" + directoryPrefix + "Journal/").setBindingsDirectory(getTestDir() + "/" + directoryPrefix + "Bind/").setLargeMessagesDirectory(getTestDir() + "/" + directoryPrefix + "Large/").setPagingDirectory(getTestDir() + "/" + directoryPrefix + "Page/").addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName));
ConfigurationImpl configuration = createBasicConfig(index)
.setJMXManagementEnabled(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap))
.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap))
.addConnectorConfiguration(secondaryConnectorName, secondaryConnector)
.addConnectorConfiguration(primaryConnectorName, primaryConnector)
.addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0));

recreateDataDirectories(getTestDir(), index, false);

return configuration;
}

@Override
protected boolean usePersistence() {
return true;
}
}