Skip to content

Commit

Permalink
ARTEMIS-1930 require STOMP durable sub name to unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Jun 19, 2018
1 parent 77eb0b6 commit 8812f9b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
Expand Down Expand Up @@ -255,14 +256,12 @@ public StompPostReceiptFunction addSubscription(long consumerID,
SimpleString address = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination);
SimpleString selectorSimple = SimpleString.toSimpleString(selector);
boolean pubSub = false;
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;

Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (topic) {
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (multicast) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
Expand All @@ -276,8 +275,8 @@ public StompPostReceiptFunction addSubscription(long consumerID,
session.createQueue(address, queueName, selectorSimple, true, false);
}
}
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast);
subscriptions.put(consumerID, subscription);
session.start();
return () -> consumer.receiveCredits(receiveCredits);
Expand All @@ -295,14 +294,15 @@ public boolean unsubscribe(String id, String durableSubscriptionName, String cli
iterator.remove();
SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) {
Queue queue = manager.getServer().locateQueue(queueName);
if (sub.isMulticast() && queue != null && (durableSubscriptionName == null && !queue.isDurable())) {
session.deleteQueue(queueName);
}
result = true;
}
}

if (!result && durableSubscriptionName != null && clientID != null) {
if (durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public class StompSubscription {

private final SimpleString queueName;

// whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic)
private final boolean pubSub;
// whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
private final boolean multicast;

// Static --------------------------------------------------------

// Constructors --------------------------------------------------

public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.pubSub = pubSub;
this.multicast = multicast;
}

// Public --------------------------------------------------------
Expand All @@ -57,13 +57,13 @@ public SimpleString getQueueName() {
return queueName;
}

public boolean isPubSub() {
return pubSub;
public boolean isMulticast() {
return multicast;
}

@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]";
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,40 @@ public void testDurableUnSubscribe() throws Exception {
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}

@Test
public void testDurableUnSubscribeWithoutDurableSubName() throws Exception {
server.getActiveMQServer().getConfiguration().getWildcardConfiguration().setDelimiter('/');
server.getActiveMQServer().getAddressSettingsRepository().addMatch("/topic/#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.MULTICAST).setDefaultQueueRoutingType(RoutingType.MULTICAST));
conn.connect(defUser, defPass, "myclientid");
String subId = UUID.randomUUID().toString();
String durableSubName = UUID.randomUUID().toString();
String receipt = UUID.randomUUID().toString();
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo")
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);

frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));

assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));

receipt = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE)
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);

frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));

conn.disconnect();

// make sure the durable subscription queue is still there
assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
}

@Test
public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception {
conn.connect(defUser, defPass, "myclientid");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.UUID;

import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
Expand Down

0 comments on commit 8812f9b

Please sign in to comment.