Skip to content

Commit

Permalink
ARTEMIS-4332 Add management method to close stuck server sessions
Browse files Browse the repository at this point in the history
In rare cases a store operation could silently fails or starves, blocking the
related server session and all delivering messages. Those server sessions can
be closed adding a management method that cleans their operation context
before closing them.
  • Loading branch information
brusdev authored and clebertsuconic committed Jul 13, 2023
1 parent b4230c6 commit 451d03f
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,14 @@ void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy
boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = "connectionID") String connectionID,
@Parameter(desc = "The session ID", name = "ID") String ID) throws Exception;

/**
* Closes the session with the given id.
*/
@Operation(desc = "Closes the session with the id", impact = MBeanOperationInfo.INFO)
boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = "connectionID") String connectionID,
@Parameter(desc = "The session ID", name = "ID") String ID,
@Parameter(desc = "Force session close cancelling pending tasks", name = "force") boolean force) throws Exception;

/**
* Closes the consumer with the given id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,11 @@ public boolean closeConnectionWithID(final String ID) {

@Override
public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception {
return closeSessionWithID(connectionID, ID, false);
}

@Override
public boolean closeSessionWithID(final String connectionID, final String ID, final boolean force) throws Exception {
// possibly a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
Expand All @@ -2393,7 +2398,7 @@ public boolean closeSessionWithID(final String connectionID, final String ID) th
List<ServerSession> sessions = server.getSessions(connectionID);
for (ServerSession session : sessions) {
if (session.getName().equals(ID.toString())) {
session.close(true);
session.close(true, force);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ public interface OperationContext extends IOCompletion {
* @throws Exception
*/
boolean waitCompletion(long timeout) throws Exception;

default void clear() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,27 @@ public String toString() {
executorsPendingField +
"]";
}

@Override
public synchronized void clear() {
stored = 0;
storeLineUpField = 0;
minimalReplicated = 0;
replicated = 0;
replicationLineUpField = 0;
paged = 0;
minimalPage = 0;
pageLineUpField = 0;
errorCode = -1;
errorMessage = null;
executorsPendingField = 0;

if (tasks != null) {
tasks.clear();
}

if (storeOnlyTasks != null) {
storeOnlyTasks.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ RoutingStatus doSend(Transaction tx,

void close(boolean failed) throws Exception;

void close(boolean failed, boolean force) throws Exception;

void setTransferring(boolean transferring);

Set<ServerConsumer> getServerConsumers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1707,9 +1707,18 @@ public void stop() {

@Override
public void close(final boolean failed) {
close(failed, false);
}

@Override
public void close(final boolean failed, final boolean force) {
if (closed)
return;

if (force) {
context.clear();
}

context.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5805,6 +5805,60 @@ public void testCloseJMSclient() throws Exception {
Assert.assertTrue(((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed());
}

@Test
public void testForceCloseSession() throws Exception {
testForceCloseSession(false, false);
}

@Test
public void testForceCloseSessionWithError() throws Exception {
testForceCloseSession(true, false);
}

@Test
public void testForceCloseSessionWithPendingStoreOperation() throws Exception {
testForceCloseSession(false, true);
}

private void testForceCloseSession(boolean error, boolean pendingStoreOperation) throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
boolean durable = true;

ActiveMQServerControl serverControl = createManagementControl();

checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
if (legacyCreateQueue) {
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
} else {
serverControl.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(durable).setAutoCreateAddress(false).toJSON());
}

ServerLocator receiveLocator = createInVMNonHALocator().setCallTimeout(500);
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
final ClientConsumer clientConsumer = receiveClientSession.createConsumer(name);

Assert.assertEquals(1, server.getSessions().size());

ServerSession serverSession = server.getSessions().iterator().next();
Assert.assertEquals(((ClientSessionImpl)receiveClientSession).getName(), serverSession.getName());

if (error) {
serverSession.getSessionContext().onError(0, "error");
}

if (pendingStoreOperation) {
serverSession.getSessionContext().storeLineUp();
}

serverControl.closeSessionWithID(serverSession.getConnectionID().toString(), serverSession.getName(), true);

Wait.assertTrue(() -> serverSession.getServerConsumers().size() == 0, 500);
Wait.assertTrue(() -> server.getSessions().size() == 0, 500);
}

@Test
public void testAddUser() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public boolean closeSessionWithID(String connectionID, String ID) throws Excepti
return (Boolean) proxy.invokeOperation("closeSessionWithID", connectionID, ID);
}

@Override
public boolean closeSessionWithID(String connectionID, String ID, boolean force) throws Exception {
return (Boolean) proxy.invokeOperation("closeSessionWithID", connectionID, ID, force);
}

@Override
public boolean closeConsumerWithID(String sessionID, String ID) throws Exception {
return (Boolean) proxy.invokeOperation("closeConsumerWithID", sessionID, ID);
Expand Down

0 comments on commit 451d03f

Please sign in to comment.