Skip to content

Commit

Permalink
ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578
Browse files Browse the repository at this point in the history
1 of 2) - Porting of HORNETMQ-1575

In a live-backup scenario, when live is down and backup becomes live, clients
using HA Connection Factories can failover automatically. However if a
client decides to create a new connection by itself (as in camel jms case)
there is a chance that the new connection is pointing to the dead live
and the connection won't be successful. The reason is that if the old
connection is gone the backup will not get a chance to announce itself
back to client so it fails on initial connection.

The fix is to let CF remember the old topology and use it on any
initial connection attempts.
  • Loading branch information
howardgao authored and clebertsuconic committed Jul 8, 2020
1 parent a69b2ae commit 6f8ff55
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, new Pair<>(connectorConfig, null),
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
}

ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
createTrace = new Exception();

this.serverLocator = serverLocator;
Expand All @@ -171,11 +184,11 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,

this.clientProtocolManager.setSessionFactory(this);

this.currentConnectorConfig = connectorConfig;
this.currentConnectorConfig = connectorConfig.getA();

connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
connectorFactory = instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName());

checkTransportKeys(connectorFactory, connectorConfig);
checkTransportKeys(connectorFactory, connectorConfig.getA());

this.callTimeout = locatorConfig.callTimeout;

Expand Down Expand Up @@ -216,6 +229,10 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);

connectionReadyForWrites = true;

if (connectorConfig.getB() != null) {
this.backupConfig = connectorConfig.getB();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ private enum STATE {

private TransportConfiguration clusterTransportConfiguration;

private boolean useTopologyForLoadBalancing;

/** For tests only */
public DiscoveryGroup getDiscoveryGroup() {
return discoveryGroup;
Expand Down Expand Up @@ -422,7 +420,7 @@ private ServerLocatorImpl(ServerLocatorImpl locator) {
clusterTransportConfiguration = locator.clusterTransportConfiguration;
}

private TransportConfiguration selectConnector() {
private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;

flushTopology();
Expand All @@ -432,22 +430,22 @@ private TransportConfiguration selectConnector() {
}

synchronized (this) {
if (usedTopology != null && config.useTopologyForLoadBalancing) {
if (usedTopology != null && config.useTopologyForLoadBalancing && !useInitConnector) {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from topology.");
}
int pos = loadBalancingPolicy.select(usedTopology.length);
Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];

return pair.getA();
return pair;
} else {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from initial connectors.");
}

int pos = loadBalancingPolicy.select(initialConnectors.length);

return initialConnectors[pos];
return new Pair(initialConnectors[pos], null);
}
}
}
Expand Down Expand Up @@ -658,10 +656,19 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
synchronized (this) {
boolean retry = true;
int attempts = 0;
boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0;
boolean staticTried = false;
boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;

while (retry && !isClosed()) {
retry = false;

TransportConfiguration tc = selectConnector();
/*
* The logic is: If receivedTopology is false, try static first.
* if receivedTopology is true, try topologyArray first
*/
Pair<TransportConfiguration, TransportConfiguration> tc = selectConnector(shouldTryStatic);

if (tc == null) {
throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory();
}
Expand All @@ -682,12 +689,32 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
try {
if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
attempts++;

int connectorsSize = getConnectorsSize();
int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts;

if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
if (shouldTryStatic) {
//we know static is used
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * this.getNumInitialConnectors()) {
if (topologyArrayTried) {
//stop retry and throw exception
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
} else {
//lets try topologyArray
staticTried = true;
shouldTryStatic = false;
attempts = 0;
}
}
} else {
//we know topologyArray is used
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * getConnectorsSize()) {
if (staticTried) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
} else {
topologyArrayTried = true;
shouldTryStatic = true;
attempts = 0;
}
}
}
if (factory.waitForRetry(config.retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
Expand Down Expand Up @@ -1414,7 +1441,6 @@ public void notifyNodeDown(final long eventTime, final String nodeID) {
if (topology.isEmpty()) {
// Resetting the topology to its original condition as it was brand new
receivedTopology = false;
topologyArray = null;
} else {
updateArraysAndPairs(eventTime);

Expand Down Expand Up @@ -1492,6 +1518,12 @@ private void internalUpdateArray(long time) {
synchronized (topologyArrayGuard) {
Collection<TopologyMemberImpl> membersCopy = topology.getMembers();

if (membersCopy.size() == 0) {
//it could happen when live is down, in that case we keeps the old copy
//and don't update
return;
}

Pair<TransportConfiguration, TransportConfiguration>[] topologyArrayLocal = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class, membersCopy.size());

int count = 0;
Expand Down Expand Up @@ -1557,7 +1589,6 @@ public void factoryClosed(final ClientSessionFactory factory) {

if (!clusterConnection && isEmpty) {
receivedTopology = false;
topologyArray = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand All @@ -34,13 +36,17 @@

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
Expand Down Expand Up @@ -254,6 +260,7 @@ public void testManualFailover() throws Exception {
jbcfLive.setBlockOnDurableSend(true);

ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams));

jbcfBackup.setBlockOnNonDurableSend(true);
jbcfBackup.setBlockOnDurableSend(true);
jbcfBackup.setInitialConnectAttempts(-1);
Expand Down Expand Up @@ -437,6 +444,74 @@ public void run() {

}

@Test
public void testCreateNewConnectionAfterFailover() throws Exception {
ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc);
jbcf.setInitialConnectAttempts(5);
jbcf.setRetryInterval(1000);
jbcf.setReconnectAttempts(-1);

Connection conn1 = null, conn2 = null, conn3 = null;

try {
conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);

conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);

Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession();
ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession();

Topology fullTopology = jbcf.getServerLocator().getTopology();
Collection<TopologyMemberImpl> members = fullTopology.getMembers();
assertEquals(1, members.size());
TopologyMemberImpl member = members.iterator().next();
TransportConfiguration tcLive = member.getLive();
TransportConfiguration tcBackup = member.getBackup();

System.out.println("live tc: " + tcLive);
System.out.println("Backup tc: " + tcBackup);

JMSUtil.crash(liveServer, coreSession1, coreSession2);

waitForServerToStart(backupServer);

//now pretending that the live down event hasn't been propagated to client
simulateLiveDownHasNotReachClient((ServerLocatorImpl) jbcf.getServerLocator(), tcLive, tcBackup);

//now create a new connection after live is down
try {
conn3 = jbcf.createConnection();
} catch (Exception e) {
fail("The new connection should be established successfully after failover");
}
} finally {
if (conn1 != null) {
conn1.close();
}
if (conn2 != null) {
conn2.close();
}
if (conn3 != null) {
conn3.close();
}
}
}

private void simulateLiveDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcLive, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException {
Field f = locator.getClass().getDeclaredField("topologyArray");
f.setAccessible(true);

Pair<TransportConfiguration, TransportConfiguration>[] value = (Pair<TransportConfiguration, TransportConfiguration>[]) f.get(locator);
assertEquals(1, value.length);
Pair<TransportConfiguration, TransportConfiguration> member = value[0];
member.setA(tcLive);
member.setB(tcBackup);
f.set(locator, value);
}

// Package protected ---------------------------------------------

// Protected -----------------------------------------------------
Expand All @@ -463,7 +538,10 @@ protected void startServers() throws Exception {

backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);

backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName()));
backuptc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupAcceptortc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);

backupConf = createBasicConfig().addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName()));

backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager));

Expand All @@ -484,7 +562,7 @@ protected void startServers() throws Exception {
liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1));

liveJMSServer.getActiveMQServer().setIdentity("JMSLive");
log.debug("Starting life");
log.debug("Starting live");

liveJMSServer.start();

Expand Down

0 comments on commit 6f8ff55

Please sign in to comment.