Skip to content

Commit

Permalink
ARTEMIS-1995 Client fail over fails when live shut down too soon
Browse files Browse the repository at this point in the history
In a live-backup scenario, if the live is restarted and shutdown too soon,
the client have a chance to fail on failover because it's internal topology
is inconsistent with the final status. The client keeps connecting to live
already shut down, never trying to connect to the backup.

It's a porting from HORNETQ-1572.

(cherry picked from commit 983232d)
  • Loading branch information
howardgao authored and clebertsuconic committed Aug 1, 2018
1 parent 73b3ebf commit 1282beb
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 11 deletions.
Expand Up @@ -77,11 +77,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private final ClientProtocolManager clientProtocolManager;

private final TransportConfiguration connectorConfig;
private TransportConfiguration connectorConfig;

private TransportConfiguration currentConnectorConfig;

private TransportConfiguration backupConfig;
private volatile TransportConfiguration backupConfig;

private ConnectorFactory connectorFactory;

Expand Down Expand Up @@ -175,8 +175,6 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,

this.clientProtocolManager.setSessionFactory(this);

this.connectorConfig = connectorConfig;

this.currentConnectorConfig = connectorConfig;

connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
Expand Down Expand Up @@ -881,6 +879,10 @@ private void checkCloseConnection() {
}
}

//The order of connector configs to try to get a connection:
//currentConnectorConfig, backupConfig and then lastConnectorConfig.
//On each successful connect, the current and last will be
//updated properly.
@Override
public RemotingConnection getConnection() {
if (closed)
Expand Down Expand Up @@ -1101,8 +1103,8 @@ protected Connection createTransportConnection() {

// Switching backup as live
connector = backupConnector;
connectorConfig = currentConnectorConfig;
currentConnectorConfig = backupConfig;
backupConfig = null;
connectorFactory = backupConnectorFactory;
return transportConnection;
}
Expand All @@ -1113,23 +1115,24 @@ protected Connection createTransportConnection() {
}


if (currentConnectorConfig.equals(connectorConfig)) {
if (currentConnectorConfig.equals(connectorConfig) || connectorConfig == null) {

// There was no changes on current and original connectors, just return null here and let the retry happen at the first portion of this method on the next retry
return null;
}

ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
ConnectorFactory lastConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());

Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig);
Connector lastConnector = createConnector(lastConnectorFactory, connectorConfig);

transportConnection = openTransportConnection(originalConnector);
transportConnection = openTransportConnection(lastConnector);

if (transportConnection != null) {
logger.debug("Returning into original connector");
connector = originalConnector;
backupConfig = null;
connector = lastConnector;
TransportConfiguration temp = currentConnectorConfig;
currentConnectorConfig = connectorConfig;
connectorConfig = temp;
return transportConnection;
} else {
logger.debug("no connection been made, returning null");
Expand Down
Expand Up @@ -19,6 +19,7 @@
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -45,6 +46,7 @@
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
Expand Down Expand Up @@ -651,6 +653,110 @@ public void testFailBothRestartLive() throws Exception {
Assert.assertEquals(0, sf.numConnections());
}

@Test(timeout = 10000)
public void testFailLiveTooSoon() throws Exception {
ServerLocator locator = getServerLocator();

locator.setReconnectAttempts(-1);
locator.setRetryInterval(10);

sf = (ClientSessionFactoryInternal)locator.createSessionFactory();

waitForBackupConfig(sf);

TransportConfiguration initialLive = getFieldFromSF(sf, "currentConnectorConfig");
TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");

System.out.println("initlive: " + initialLive);
System.out.println("initback: " + initialBackup);

TransportConfiguration last = getFieldFromSF(sf, "connectorConfig");
TransportConfiguration current = getFieldFromSF(sf, "currentConnectorConfig");

System.out.println("now last: " + last);
System.out.println("now current: " + current);
assertTrue(current.equals(initialLive));

ClientSession session = createSession(sf, true, true);

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, true);

//crash 1
crash();

//make sure failover is ok
createSession(sf, true, true).close();

last = getFieldFromSF(sf, "connectorConfig");
current = getFieldFromSF(sf, "currentConnectorConfig");

System.out.println("now after live crashed last: " + last);
System.out.println("now current: " + current);

assertTrue(current.equals(initialBackup));

//fail back
beforeRestart(liveServer);
adaptLiveConfigForReplicatedFailBack(liveServer);
liveServer.getServer().start();

Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
int i = 0;
while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
Assert.assertTrue(backupServer.isStarted());

//make sure failover is ok
createSession(sf, true, true).close();

last = getFieldFromSF(sf, "connectorConfig");
current = getFieldFromSF(sf, "currentConnectorConfig");

System.out.println("now after live back again last: " + last);
System.out.println("now current: " + current);

//cannot use equals here because the config's name (uuid) changes
//after failover
assertTrue(current.isSameParams(initialLive));

//now manually corrupt the backup in sf
setSFFieldValue(sf, "backupConfig", null);

//crash 2
crash();

beforeRestart(backupServer);
createSession(sf, true, true).close();

sf.close();
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
}

protected void waitForBackupConfig(ClientSessionFactoryInternal sf) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
int cnt = 50;
while (initialBackup == null && cnt > 0) {
cnt--;
Thread.sleep(200);
initialBackup = getFieldFromSF(sf, "backupConfig");
}
}

protected void setSFFieldValue(ClientSessionFactoryInternal sf, String tcName, Object value) throws NoSuchFieldException, IllegalAccessException {
Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
tcField.setAccessible(true);
tcField.set(sf, value);
}

protected TransportConfiguration getFieldFromSF(ClientSessionFactoryInternal sf, String tcName) throws NoSuchFieldException, IllegalAccessException {
Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
tcField.setAccessible(true);
return (TransportConfiguration) tcField.get(sf);
}

/**
* Basic fail-back test.
*
Expand Down
Expand Up @@ -362,6 +362,11 @@ public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception {
@Ignore
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception {
}

@Override
@Ignore
public void testFailLiveTooSoon() throws Exception {
}
}


0 comments on commit 1282beb

Please sign in to comment.