Skip to content

Commit

Permalink
ARTEMIS-4523 Openwire leaving consumers isolated after reconnects
Browse files Browse the repository at this point in the history
co-authored with Gary Tully
  • Loading branch information
clebertsuconic committed Dec 4, 2023
1 parent 2efc496 commit 3ec0274
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
// Request of forced shutdown
private volatile boolean requestedForcedShutdown = false;
// Request of educated shutdown:
private volatile boolean requestedShutdown = false;
protected volatile boolean requestedShutdown = false;
// Request to yield to another thread
private volatile boolean yielded = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
private final ActorListener<T> listener;
private final Runnable overThreshold;
private final Runnable clearThreshold;
private volatile Runnable shutdownTask;

public ThresholdActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable clearThreshold) {
super(parent);
Expand All @@ -54,10 +53,6 @@ public ThresholdActor(Executor parent, ActorListener<T> listener, int maxSize, T

@Override
protected final void doTask(Object task) {
if (task == shutdownTask) {
shutdownTask.run();
return;
}
if (task == FLUSH) {
clearThreshold.run();
// should set to 0 no matter the value. There's a single thread setting this value back to zero
Expand Down Expand Up @@ -100,14 +95,8 @@ public void flush() {
}
}

public void shutdown(Runnable runnable) {
// do no more pending work
public void requestShutdown() {
requestedShutdown = true;
tasks.clear();
// run this task next
shutdownTask = runnable;
tasks.add(runnable);
// wait for shutdown task to complete
flush();
shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,53 +182,4 @@ public void testFlow(boolean respectSemaphore) throws Exception {
executorService.shutdown();
}
}


@Test
public void testShutdownTask() throws Exception {
AtomicInteger lastAcquireFailed = new AtomicInteger(0);
lastProcessed.set(0);

Semaphore allowedTasks = new Semaphore(10);
CountDownLatch completedTasks = new CountDownLatch(11);
CountDownLatch pendingTasks = new CountDownLatch(11);

final ExecutorService executorService = Executors.newSingleThreadExecutor();

ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, (i) -> {
try {
pendingTasks.countDown();
if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) {
lastProcessed.set(i);
} else {
lastAcquireFailed.set(i);
}
completedTasks.countDown();
} catch (InterruptedException ignored) {
}

}, 1000, (e) -> {
return 1;
}, () -> {
}, () -> {
});

// expect allowedTasks tasks to complete
for (int i = 1; i < 100; i++) {
actor.act(i);
}
// wait for task processing
Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS));

actor.shutdown(() -> {
lastProcessed.set(lastProcessed.get() * 1000);
});

Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS));

// assert processing terminated at block point
Assert.assertEquals(10000, lastProcessed.get());
// pending task executed as expected
Assert.assertEquals(11, lastAcquireFailed.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.openwire;

import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.transaction.xa.XAException;
Expand Down Expand Up @@ -761,7 +760,11 @@ public void fail(ActiveMQException me, String message) {

final ThresholdActor<Command> localVisibleActor = openWireActor;
if (localVisibleActor != null) {
localVisibleActor.shutdown(() -> doFail(me, message));
localVisibleActor.requestShutdown();
}

if (executor != null) {
executor.execute(() -> doFail(me, message));
} else {
doFail(me, message);
}
Expand All @@ -779,11 +782,16 @@ private void doFail(ActiveMQException me, String message) {
}
try {
if (this.getConnectionInfo() != null) {
protocolManager.removeConnection(this.getConnectionInfo(), me);
protocolManager.removeConnection(getClientID(), this);
}
} catch (InvalidClientIDException e) {
logger.warn("Couldn't close connection because invalid clientID", e);
} finally {
try {
disconnect(false);
} catch (Throwable e) {
// it should never happen, but never say never
logger.debug("OpenWireConnection::disconnect failure", e);
}

// there may be some transactions not associated with sessions
// deal with them after sessions are removed via connection removal
operationContext.executeOnCompletion(new IOCallback() {
Expand Down Expand Up @@ -876,29 +884,6 @@ private void createInternalSession(ConnectionInfo info) throws Exception {
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser, false);
}

//raise the refCount of context
public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) throws Exception {
this.context = existingContext;
WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo();
// Older clients should have been defaulting this field to true.. but
// they were not.
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
info.setClientMaster(true);
}
if (info.getClientIp() == null) {
info.setClientIp(getRemoteAddress());
}
createInternalSession(info);
state = new ConnectionState(info);
state.reset(info);

context.setConnection(this);
context.setConnectionState(state);
context.setClientMaster(info.isClientMaster());
context.setFaultTolerant(info.isFaultTolerant());
context.setReconnect(true);
}

/**
* This will answer with commands to the client
*/
Expand Down Expand Up @@ -1817,13 +1802,13 @@ public Response processRecoverTransactions(TransactionInfo info) throws Exceptio
@Override
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
//we let protocol manager to handle connection add/remove
for (SessionState sessionState : state.getSessionStates()) {
propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
}
try {
for (SessionState sessionState : state.getSessionStates()) {
propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
}
protocolManager.removeConnection(state.getInfo(), null);
} catch (Throwable e) {
// log
protocolManager.removeConnection(getClientID(), OpenWireConnection.this);
} finally {
OpenWireConnection.this.disconnect(false);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O

private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();

private final Map<String, AMQConnectionContext> clientIdSet = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, OpenWireConnection> clientIdSet = new ConcurrentHashMap<>();

private String brokerName;

Expand Down Expand Up @@ -244,18 +244,9 @@ public void nodeDown(long eventUID, String nodeID) {
}
}

public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException {
String clientId = info.getClientId();
if (clientId != null) {
AMQConnectionContext context = this.clientIdSet.remove(clientId);
if (context != null) {
//connection is still there and need to close
context.getConnection().disconnect(error != null);
this.connections.remove(context.getConnection());
}
} else {
throw new InvalidClientIDException("No clientID specified for connection disconnect request");
}
public void removeConnection(String clientID, OpenWireConnection connection) {
clientIdSet.remove(clientID, connection);
connections.remove(connection);
}

/*** if set, the OpenWire connection will bypass the tcpReadBuferSize and use this value instead.
Expand Down Expand Up @@ -421,22 +412,16 @@ public void addConnection(OpenWireConnection connection, ConnectionInfo info) th
}

AMQConnectionContext context;
context = clientIdSet.get(clientId);
if (context != null) {
if (info.isFailoverReconnect()) {
OpenWireConnection oldConnection = context.getConnection();
oldConnection.disconnect(true);
connections.remove(oldConnection);
connection.reconnect(context, info);
} else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
OpenWireConnection oldConnection = clientIdSet.get(clientId);
if (oldConnection != null) {
if (!info.isFailoverReconnect()) {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldConnection.getRemoteAddress());
}
} else {
//new connection
context = connection.initContext(info);
clientIdSet.put(clientId, context);
}

context = connection.initContext(info);

clientIdSet.put(clientId, connection);
connections.add(connection);

ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
Expand Down Expand Up @@ -498,4 +500,60 @@ public void beforeCreateSession(String name,

}


@Test(timeout = 10_000)
public void testForceDropOpenWire() throws Throwable {
ActiveMQServer server = createServer(true, createDefaultConfig(true));
server.start();

Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));

CountDownLatch beforeCreateCalled = new CountDownLatch(1);
CountDownLatch goCreateConsumer = new CountDownLatch(1);
server.registerBrokerPlugin(new ActiveMQServerConsumerPlugin() {
@Override
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
if (consumer.getQueue() == serverQueue) {
logger.info("Creating a consumer at {}", consumer.getQueue());
beforeCreateCalled.countDown();
try {
goCreateConsumer.await(5, TimeUnit.MINUTES);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
});

ExecutorService executorService = Executors.newFixedThreadPool(1);
runAfter(executorService::shutdownNow);

ConnectionFactory factory = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616");

CountDownLatch done = new CountDownLatch(1);

executorService.execute(() -> {
try (Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("test-queue"))) {
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
done.countDown();
}
});

Assert.assertTrue(beforeCreateCalled.await(5, TimeUnit.MINUTES));

server.getRemotingService().getConnections().forEach(r -> {
r.fail(new ActiveMQException("this is a simulation"));
});

goCreateConsumer.countDown();

Wait.assertEquals(0, serverQueue::getConsumerCount);
}



}

0 comments on commit 3ec0274

Please sign in to comment.