Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-1930 + some debug logging #2148

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.jboss.logging.Logger;

import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;

public final class StompConnection implements RemotingConnection {

private static final Logger logger = Logger.getLogger(StompConnection.class);

protected static final String CONNECTION_ID_PROP = "__AMQ_CID";
private static final String SERVER_NAME = "ActiveMQ-Artemis/" + VersionLoader.getVersion().getFullVersion() +
" ActiveMQ Artemis Messaging Engine";
Expand Down Expand Up @@ -582,6 +585,27 @@ public void handleFrame(StompFrame request) {
}
}

public void logFrame(StompFrame request, boolean in) {
if (logger.isDebugEnabled()) {
StringBuilder message = new StringBuilder()
.append("STOMP(")
.append(getRemoteAddress())
.append(", ")
.append(this.getID())
.append("):");

if (in) {
message.append(" IN << ");
} else {
message.append("OUT >> ");
}

message.append(request);

logger.debug(message.toString());
}
}

public void sendFrame(StompFrame frame, StompPostReceiptFunction function) {
manager.sendReply(this, frame, function);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ public int getEncodedSize() throws Exception {

@Override
public String toString() {
return "StompFrame[command=" + command + ", headers=" + headers + ", content= " + this.body + " bytes " +
Arrays.toString(bytesBody);
return new StringBuilder()
.append("StompFrame[command=").append(command)
.append(", headers=").append(headers)
.append(", content= ").append(this.body)
.append(", bytes= ").append(Arrays.toString(bytesBody))
.toString();
}

public boolean isPing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void handleBuffer(final RemotingConnection connection, final ActiveMQBuff

try {
invokeInterceptors(this.incomingInterceptors, request, conn);
conn.logFrame(request, true);
conn.handleFrame(request);
} finally {
server.getStorageManager().clearContext();
Expand Down Expand Up @@ -186,11 +187,8 @@ public List<String> websocketSubprotocolIdentifiers() {
// Public --------------------------------------------------------

public boolean send(final StompConnection connection, final StompFrame frame) {
if (ActiveMQStompProtocolLogger.LOGGER.isTraceEnabled()) {
ActiveMQStompProtocolLogger.LOGGER.trace("sent " + frame);
}

invokeInterceptors(this.outgoingInterceptors, frame, connection);
connection.logFrame(frame, false);

synchronized (connection) {
if (connection.isDestroyed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
Expand Down Expand Up @@ -255,14 +256,12 @@ public StompPostReceiptFunction addSubscription(long consumerID,
SimpleString address = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination);
SimpleString selectorSimple = SimpleString.toSimpleString(selector);
boolean pubSub = false;
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;

Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (topic) {
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (multicast) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
Expand All @@ -276,8 +275,8 @@ public StompPostReceiptFunction addSubscription(long consumerID,
session.createQueue(address, queueName, selectorSimple, true, false);
}
}
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast);
subscriptions.put(consumerID, subscription);
session.start();
return () -> consumer.receiveCredits(receiveCredits);
Expand All @@ -295,14 +294,15 @@ public boolean unsubscribe(String id, String durableSubscriptionName, String cli
iterator.remove();
SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) {
Queue queue = manager.getServer().locateQueue(queueName);
if (sub.isMulticast() && queue != null && (durableSubscriptionName == null && !queue.isDurable())) {
session.deleteQueue(queueName);
}
result = true;
}
}

if (!result && durableSubscriptionName != null && clientID != null) {
if (durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public class StompSubscription {

private final SimpleString queueName;

// whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic)
private final boolean pubSub;
// whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
private final boolean multicast;

// Static --------------------------------------------------------

// Constructors --------------------------------------------------

public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.pubSub = pubSub;
this.multicast = multicast;
}

// Public --------------------------------------------------------
Expand All @@ -57,13 +57,13 @@ public SimpleString getQueueName() {
return queueName;
}

public boolean isPubSub() {
return pubSub;
public boolean isMulticast() {
return multicast;
}

@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]";
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + "]";
}

}
8 changes: 8 additions & 0 deletions docs/user-manual/en/stomp.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ In Apache ActiveMQ Artemis, these destinations are mapped to *addresses* and
*queues* depending on the operation being done and the desired semantics (e.g.
anycast or multicast).

## Logging

Incoming and outgoing STOMP frames can be logged by enabling `DEBUG` for
`org.apache.activemq.artemis.core.protocol.stomp.StompConnection`. This can be
extremely useful for debugging or simply monitoring client activity. Along with
the STOMP frame itself the remote IP address of the client is logged as well as
the internal connection ID so that frames from the same client can be correlated.

## Sending

When a STOMP client sends a message (using a `SEND` frame), the protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,40 @@ public void testDurableUnSubscribe() throws Exception {
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}

@Test
public void testDurableUnSubscribeWithoutDurableSubName() throws Exception {
server.getActiveMQServer().getConfiguration().getWildcardConfiguration().setDelimiter('/');
server.getActiveMQServer().getAddressSettingsRepository().addMatch("/topic/#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.MULTICAST).setDefaultQueueRoutingType(RoutingType.MULTICAST));
conn.connect(defUser, defPass, "myclientid");
String subId = UUID.randomUUID().toString();
String durableSubName = UUID.randomUUID().toString();
String receipt = UUID.randomUUID().toString();
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo")
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);

frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));

assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));

receipt = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE)
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);

frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));

conn.disconnect();

// make sure the durable subscription queue is still there
assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
}

@Test
public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception {
conn.connect(defUser, defPass, "myclientid");
Expand Down