Skip to content

Commit

Permalink
ARTEMIS-1526 race condition between listConsumers() and closing a Ses…
Browse files Browse the repository at this point in the history
…sion. When session not found, ignore that consumer and continue.
  • Loading branch information
pgfox authored and jbertram committed Dec 5, 2017
1 parent 8f9bab6 commit 7d61969
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,23 @@ public Class getClassT() {
@Override
public JsonObjectBuilder toJson(ServerConsumer consumer) {
ServerSession session = server.getSessionByID(consumer.getSessionID());
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID())).add("sessionName", toString(consumer.getSessionName())).add("connectionClientID", toString(consumer.getConnectionClientID())).add("user", toString(session.getUsername())).add("connectionProtocolName", toString(consumer.getConnectionProtocolName())).add("queueName", toString(consumer.getQueueName())).add("queueType", toString(consumer.getQueueType()).toLowerCase()).add("queueAddress", toString(consumer.getQueueAddress().toString())).add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress())).add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress())).add("creationTime", new Date(consumer.getCreationTime()).toString());

//if session is not available then consumer is not in valid state - ignore
if (session == null) {
return null;
}

JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID()))
.add("sessionName", toString(consumer.getSessionName()))
.add("connectionClientID", toString(consumer.getConnectionClientID()))
.add("user", toString(session.getUsername()))
.add("connectionProtocolName", toString(consumer.getConnectionProtocolName()))
.add("queueName", toString(consumer.getQueueName()))
.add("queueType", toString(consumer.getQueueType()).toLowerCase())
.add("queueAddress", toString(consumer.getQueueAddress().toString()))
.add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress()))
.add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress()))
.add("creationTime", new Date(consumer.getCreationTime()).toString());
return obj;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@

import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
Expand Down Expand Up @@ -57,8 +63,7 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
*/

@Test
@BMRules(rules = {@BMRule(
name = "Delay listAddress() by 2 secs ",
@BMRules(rules = {@BMRule(name = "Delay listAddress() by 2 secs ",
targetClass = "org.apache.activemq.artemis.core.management.impl.view.AddressView ",
targetMethod = "<init>(org.apache.activemq.artemis.core.server.ActiveMQServer)",
targetLocation = "ENTRY",
Expand Down Expand Up @@ -109,6 +114,90 @@ public void run() {
}
}

/**
* Aim: verify that no exceptions will occur when a session is closed during listConsumers() operation
*
* test delays the listConsumer() BEFORE the Session information associated with the consumer is retrieved.
* During this delay the client session is closed.
*
* @throws Exception
*/

@Test
@BMRules(rules = {@BMRule(name = "Delay listConsumers() by 2 secs ",
targetClass = "org.apache.activemq.artemis.core.management.impl.view.ConsumerView",
targetMethod = "toJson(org.apache.activemq.artemis.core.server.ServerConsumer)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.ActiveMQServerControlMultiThreadTest.delay(2)")})

public void listConsumersDuringSessionClose() throws Exception {

ExecutorService executorService = Executors.newFixedThreadPool(1);
SimpleString addressName1 = new SimpleString("MyAddress_one");
SimpleString queueName1 = new SimpleString("my_queue_one");

ActiveMQServerControl serverControl = createManagementControl();

server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false);

// create a consumer
try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession()) {

ClientConsumer consumer1_q1 = session.createConsumer(queueName1);

// add another consumer (on separate session)
ClientSession session_two = csf.createSession();
ClientConsumer consumer2_q1 = session_two.createConsumer(queueName1);

//first(normal) invocation - ensure 2 consumers returned
//used to block thread, until the delay() has been called.
delayCalled = new CountDownLatch(1);

String consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);

JsonObject consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
JsonArray consumersArray = (JsonArray) consumersAsJsonObject.get("data");

Assert.assertEquals("number of consumers returned from query", 2, consumersArray.size());
Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));

//second invocation - close session during listConsumers()

//used to block thread, until the delay() has been called.
delayCalled = new CountDownLatch(1);

executorService.submit(new Runnable() {
@Override
public void run() {
try {
//wait until the delay occurs and close the session.
delayCalled.await();
session.close();
} catch (Exception e) {
e.printStackTrace();
}

}
});

consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);

consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
consumersArray = (JsonArray) consumersAsJsonObject.get("data");

// session is closed before Json string is created - should only be one consumer returned
Assert.assertEquals("number of consumers returned from query", 1, consumersArray.size());
Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));

} finally {
executorService.shutdown();
}
}

//notify delay has been called and wait for X seconds
public static void delay(int seconds) {
delayCalled.countDown();
Expand Down

0 comments on commit 7d61969

Please sign in to comment.