From 960e29afadcd3b5848a95406ad14d48099fb6c3c Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 25 Jan 2016 16:36:02 -0600 Subject: [PATCH] ARTEMIS-358 topic mistakenly removed with sub The problem here is that the management notification listener was mistakenly removing the topic itself instead of just the non-durable subscription. In general I can't see why StompProtocolManager even needs to keep track of the destinations when the broker already does that. As far as I can tell it is redundant and it's clearly error-prone. Therefore I'm removing the destination tracking from StompProtocolManager altogether. --- .../protocol/stomp/StompProtocolManager.java | 60 +------------------ 1 file changed, 2 insertions(+), 58 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 951bb898fc1..0cad2597304 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; @@ -31,20 +30,13 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.api.core.management.CoreNotificationType; -import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.core.server.management.ManagementService; -import org.apache.activemq.artemis.core.server.management.Notification; -import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -53,8 +45,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; -import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.UUIDGenerator; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -62,7 +52,7 @@ /** * StompProtocolManager */ -class StompProtocolManager implements ProtocolManager, NotificationListener { +class StompProtocolManager implements ProtocolManager { // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- @@ -78,8 +68,6 @@ class StompProtocolManager implements ProtocolManager, No // key => connection ID, value => Stomp session private final Map sessions = new HashMap<>(); - private final Set destinations = new ConcurrentHashSet<>(); - private final List incomingInterceptors; private final List outgoingInterceptors; @@ -94,12 +82,6 @@ public StompProtocolManager(final StompProtocolManagerFactory factory, this.factory = factory; this.server = server; this.executor = server.getExecutorFactory().getExecutor(); - ManagementService service = server.getManagementService(); - if (service != null) { - //allow management message to pass - destinations.add(service.getManagementAddress().toString()); - service.addNotificationListener(this); - } this.incomingInterceptors = incomingInterceptors; this.outgoingInterceptors = outgoingInterceptors; } @@ -422,45 +404,7 @@ public void beginTransaction(StompConnection connection, String txID) throws Exc } public boolean destinationExists(String destination) { - return destinations.contains(destination); - } - - @Override - public void onNotification(Notification notification) { - if (!(notification.getType() instanceof CoreNotificationType)) - return; - - CoreNotificationType type = (CoreNotificationType) notification.getType(); - - TypedProperties props = notification.getProperties(); - - switch (type) { - case BINDING_ADDED: { - if (!props.containsProperty(ManagementHelper.HDR_BINDING_TYPE)) { - throw ActiveMQMessageBundle.BUNDLE.bindingTypeNotSpecified(); - } - - Integer bindingType = props.getIntProperty(ManagementHelper.HDR_BINDING_TYPE); - - if (bindingType == BindingType.DIVERT_INDEX) { - return; - } - - SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); - - destinations.add(address.toString()); - - break; - } - case BINDING_REMOVED: { - SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); - destinations.remove(address.toString()); - break; - } - default: - //ignore all others - break; - } + return server.getPostOffice().getAddresses().contains(SimpleString.toSimpleString(destination)); } public ActiveMQServer getServer() {