Skip to content

Commit

Permalink
ARTEMIS-4283 Fail fast CORE client connect on closing
Browse files Browse the repository at this point in the history
ServerLocatorImpl waits for topology after connecting a new session factory.
It should interrupt waiting for topology when it is closed to fail fast.
  • Loading branch information
brusdev authored and clebertsuconic committed May 19, 2023
1 parent 1e8b5cb commit c47d15c
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,7 @@ public interface ActiveMQClientMessageBundle {

@Message(id = 219067, value = "Keystore alias {} not found in {}")
IllegalArgumentException keystoreAliasNotFound(String keystoreAlias, String keystorePath);

@Message(id = 219068, value = "Connection closed while receiving cluster topology. Group:{}")
ActiveMQObjectClosedException connectionClosedOnReceiveTopology(DiscoveryGroup discoveryGroup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private final double retryIntervalMultiplier; // For exponential backoff

private volatile boolean topologyReady = false;

private final CountDownLatch latchFinalTopology = new CountDownLatch(1);

private final long maxRetryInterval;
Expand Down Expand Up @@ -474,6 +476,9 @@ private void interruptConnectAndCloseAllSessions(boolean close) {
closeCleanSessions(close);
closed = true;
}

//release all threads waiting for topology
latchFinalTopology.countDown();
}

/**
Expand Down Expand Up @@ -521,7 +526,9 @@ public void cleanup() {
@Override
public boolean waitForTopology(long timeout, TimeUnit unit) {
try {
return latchFinalTopology.await(timeout, unit);
//latchFinalTopology is decremented on last topology message or on close
//topologyReady is set to true only on last topology message
return latchFinalTopology.await(timeout, unit) && topologyReady;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e);
Expand Down Expand Up @@ -1502,6 +1509,7 @@ public void notifyNodeUp(long uniqueEventID,
serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast);
} finally {
if (isLast) {
topologyReady = true;
latchFinalTopology.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
// We always try to connect here with only one attempt,
// as we will perform the initial retry here, looking for all possible connectors
factory.connect(1, false);

addFactory(factory);
} finally {
removeFromConnecting(factory);
}
Expand Down Expand Up @@ -750,12 +752,17 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
// how the sendSubscription happens.
// in case this ever changes.
if (topology != null && !factory.waitForTopology(config.callTimeout, TimeUnit.MILLISECONDS)) {
factoryClosed(factory);

factory.cleanup();

if (isClosed()) {
throw ActiveMQClientMessageBundle.BUNDLE.connectionClosedOnReceiveTopology(discoveryGroup);
}

throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}

addFactory(factory);

return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -47,6 +52,47 @@ public void setUp() throws Exception {
server.start();
}

@Test
public void testFailFastConnectOnClosing() throws Exception {
CountDownLatch connectLatch = new CountDownLatch(1);
CountDownLatch subscribeLatch = new CountDownLatch(1);
AtomicBoolean connectTimedOut = new AtomicBoolean(false);

ServerLocator locator = createNonHALocator(isNetty()).setCallTimeout(30000);
try (ClientSessionFactory csf = locator.createSessionFactory()) {
Assert.assertFalse(csf.isClosed());
}

server.getRemotingService().addIncomingInterceptor((Interceptor) (packet, connection) -> {
if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
subscribeLatch.countDown();
return false;
}
return true;
});

new Thread(() -> {
try {
locator.createSessionFactory();
} catch (Exception e) {
connectTimedOut.set(e.getClass() == ActiveMQObjectClosedException.class);
}
connectLatch.countDown();
}).start();

//wait for locator subscribing
subscribeLatch.await();

//close locator while it is waiting for topology
locator.close();

//check connect fails fast
Assert.assertTrue(connectLatch.await(3000, TimeUnit.MILLISECONDS));

//check connect timed out
Assert.assertTrue(connectTimedOut.get());
}

@Test
public void testURL() throws Exception {
ServerLocatorParser parser = new ServerLocatorParser();
Expand Down

0 comments on commit c47d15c

Please sign in to comment.