Skip to content

Commit

Permalink
ARTEMIS-4251 Support CORE client failover to other live servers
Browse files Browse the repository at this point in the history
Improve the CORE client failover connecting to other live servers when all
reconnect attempts fails, i.e. in a cluster composed of 2 live servers,
when the server to which the CORE client is connected goes down the CORE
client should reconnect its sessions to the other liver broker.
  • Loading branch information
brusdev authored and clebertsuconic committed Jun 13, 2023
1 parent 2f5463c commit bd3c057
Show file tree
Hide file tree
Showing 16 changed files with 540 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ServerLocatorConfig {
public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
Expand Down Expand Up @@ -80,6 +81,7 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
maxRetryInterval = locator.maxRetryInterval;
reconnectAttempts = locator.reconnectAttempts;
initialConnectAttempts = locator.initialConnectAttempts;
failoverAttempts = locator.failoverAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public final class ActiveMQClient {

public static final int INITIAL_CONNECT_ATTEMPTS = 1;

public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;

@Deprecated
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,21 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
int getInitialConnectAttempts();

/**
* Sets the maximum number of failover attempts to establish a connection to other live servers after a connection failure.
* <p>
* Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0.
*
* @param attempts maximum number of failover attempts after a connection failure
* @return this ServerLocator
*/
ServerLocator setFailoverAttempts(int attempts);

/**
* @return the number of failover attempts after a connection failure.
*/
int getFailoverAttempts();

/**
* Returns true if the client will automatically attempt to connect to the backup server if the initial
* connection to the live server fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.BiPredicate;

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

Expand Down Expand Up @@ -132,6 +133,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private int reconnectAttempts;

private int failoverAttempts;

private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<>();

private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<>();
Expand Down Expand Up @@ -239,6 +242,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,

this.reconnectAttempts = reconnectAttempts;

this.failoverAttempts = locatorConfig.failoverAttempts;

this.scheduledThreadPool = scheduledThreadPool;

this.threadPool = threadPool;
Expand Down Expand Up @@ -640,7 +645,7 @@ private void failoverOrReconnect(final Object connectionID,
// failoverLock
// until failover is complete

if (reconnectAttempts != 0) {
if (reconnectAttempts != 0 || failoverAttempts != 0) {

if (clientProtocolManager.cleanupBeforeFailover(me)) {

Expand Down Expand Up @@ -673,33 +678,96 @@ private void failoverOrReconnect(final Object connectionID,
sessionsToFailover = new HashSet<>(sessions);
}

// Notify sessions before failover.
for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection);
}

boolean allSessionReconnected = false;
int failedReconnectSessionsCounter = 0;
do {
allSessionReconnected = reconnectSessions(sessionsToFailover, oldConnection, reconnectAttempts, me);
if (oldConnection != null) {
oldConnection.destroy();

// Try to reconnect to the current connector pair.
// Before ARTEMIS-4251 ClientSessionFactoryImpl only tries to reconnect to the current connector pair.
int reconnectRetries = 0;
boolean sessionsReconnected = false;
BiPredicate<Boolean, Integer> reconnectRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
!reconnected && (reconnectAttempts == -1 || retries < reconnectAttempts);
while (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {

int remainingReconnectRetries = reconnectAttempts == -1 ? -1 : reconnectAttempts - reconnectRetries;
reconnectRetries += getConnectionWithRetry(remainingReconnectRetries, oldConnection);

if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);

if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}

oldConnection = connection;
connection = null;
}
}

reconnectRetries++;
if (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
waitForRetry(retryInterval);
}
}


// Try to connect to other connector pairs.
// After ARTEMIS-4251 ClientSessionFactoryImpl tries to connect to
// other connector pairs when reconnection to the current connector pair fails.
int connectorsCount = 0;
int failoverRetries = 0;
long failoverRetryInterval = retryInterval;
Pair<TransportConfiguration, TransportConfiguration> connectorPair;
BiPredicate<Boolean, Integer> failoverRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
!reconnected && (failoverAttempts == -1 || retries < failoverAttempts);
while (failoverRetryPredicate.test(sessionsReconnected, failoverRetries)) {

connectorsCount++;
connectorPair = serverLocator.selectNextConnectorPair();

if (connectorPair != null) {
connectorConfig = connectorPair.getA();
currentConnectorConfig = connectorPair.getA();
if (connectorPair.getB() != null) {
backupConnectorConfig = connectorPair.getB();
}

getConnection();
}

if (!allSessionReconnected) {
failedReconnectSessionsCounter++;
oldConnection = connection;
connection = null;
if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);

// Wait for retry when the connection is established but not all session are reconnected.
if ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && oldConnection != null) {
waitForRetry(retryInterval);
if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}

oldConnection = connection;
connection = null;
}
}

if (connectorsCount >= serverLocator.getConnectorsSize()) {
connectorsCount = 0;
failoverRetries++;
if (failoverRetryPredicate.test(false, failoverRetries)) {
waitForRetry(failoverRetryInterval);
failoverRetryInterval = getNextRetryInterval(failoverRetryInterval);
}
}
}
while ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && !allSessionReconnected);


// Notify sessions after failover.
for (ClientSessionInternal session : sessionsToFailover) {
session.postHandleFailover(connection, allSessionReconnected);
session.postHandleFailover(connection, sessionsReconnected);
}

if (oldConnection != null) {
Expand Down Expand Up @@ -830,15 +898,12 @@ private void callFailoverListeners(FailoverEventType type) {
*/
private boolean reconnectSessions(final Set<ClientSessionInternal> sessionsToFailover,
final RemotingConnection oldConnection,
final int reconnectAttempts,
final ActiveMQException cause) {
getConnectionWithRetry(reconnectAttempts, oldConnection);

if (connection == null) {
if (!clientProtocolManager.isAlive())
ActiveMQClientLogger.LOGGER.failedToConnectToServer();

return true;
return false;
}

List<FailureListener> oldListeners = oldConnection.getFailureListeners();
Expand Down Expand Up @@ -874,9 +939,9 @@ private boolean reconnectSessions(final Set<ClientSessionInternal> sessionsToFai
return !sessionFailoverError;
}

private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
private int getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
if (!clientProtocolManager.isAlive())
return;
return 0;
if (logger.isTraceEnabled()) {
logger.trace("getConnectionWithRetry::{} with retryInterval = {} multiplier = {}",
reconnectAttempts, retryInterval, retryIntervalMultiplier, new Exception("trace"));
Expand All @@ -897,7 +962,7 @@ private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnect
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
}
logger.debug("Reconnection successful");
return;
return count;
} else {
// Failed to get connection

Expand All @@ -909,30 +974,36 @@ private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnect
ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
}

return;
return count;
}

if (logger.isTraceEnabled()) {
logger.trace("Waiting {} milliseconds before next retry. RetryInterval={} and multiplier={}", interval, retryInterval, retryIntervalMultiplier);
}

if (waitForRetry(interval))
return;
return count;

// Exponential back-off
long newInterval = (long) (interval * retryIntervalMultiplier);

if (newInterval > maxRetryInterval) {
newInterval = maxRetryInterval;
}

interval = newInterval;
interval = getNextRetryInterval(interval);
} else {
logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
return;
return count;
}
}
}

return count;
}

private long getNextRetryInterval(long retryInterval) {
// Exponential back-off
long nextRetryInterval = (long) (retryInterval * retryIntervalMultiplier);

if (nextRetryInterval > maxRetryInterval) {
nextRetryInterval = maxRetryInterval;
}

return nextRetryInterval;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,15 @@ private ServerLocatorImpl(ServerLocatorImpl locator) {
clusterTransportConfiguration = locator.clusterTransportConfiguration;
}

private boolean useInitConnector() {
return !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;
}

@Override
public Pair<TransportConfiguration, TransportConfiguration> selectNextConnectorPair() {
return selectConnector(useInitConnector());
}

private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;

Expand Down Expand Up @@ -470,7 +479,8 @@ private synchronized Pair<TransportConfiguration, TransportConfiguration> select
}
}

private int getConnectorsSize() {
@Override
public int getConnectorsSize() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;

flushTopology();
Expand Down Expand Up @@ -673,7 +683,7 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
int attempts = 0;
boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0;
boolean staticTried = false;
boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;
boolean shouldTryStatic = useInitConnector();

while (retry && !isClosed()) {
retry = false;
Expand Down Expand Up @@ -1177,6 +1187,18 @@ public int getInitialConnectAttempts() {
return config.initialConnectAttempts;
}

@Override
public ServerLocatorImpl setFailoverAttempts(int attempts) {
checkWrite();
this.config.failoverAttempts = attempts;
return this;
}

@Override
public int getFailoverAttempts() {
return config.failoverAttempts;
}

@Deprecated
@Override
public boolean isFailoverOnInitialConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ void notifyNodeUp(long uniqueEventID,
ClientProtocolManager newProtocolManager();

boolean isConnectable();

int getConnectorsSize();

Pair<TransportConfiguration, TransportConfiguration> selectNextConnectorPair();
}
2 changes: 1 addition & 1 deletion docs/user-manual/en/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
* [Broker Plugins](broker-plugins.md)
* [Resource Limits](resource-limits.md)
* [The JMS Bridge](jms-bridge.md)
* [Client Reconnection and Session Reattachment](client-reconnection.md)
* [Client Failover](client-failover.md)
* [Diverting and Splitting Message Flows](diverts.md)
* [Core Bridges](core-bridges.md)
* [Transformers](transformers.md)
Expand Down

0 comments on commit bd3c057

Please sign in to comment.