diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 56ff48552c6e..4d9a52e434da 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -232,6 +232,13 @@ public OpenWireProtocolManager setOpenwireUseDuplicateDetectionOnFailover(boolea @Override public void nodeUP(TopologyMember member, boolean last) { + if (member.getPrimary() == null) { + if (logger.isTraceEnabled()) { + logger.trace("{} ignoring nodeUP call due to null primary; topologyMember={}, last={}", this, member, last); + } + return; + } + if (topologyMap.put(member.getNodeId(), member) == null) { updateClientClusterInfo(); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireProtocolManagerTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireProtocolManagerTest.java new file mode 100644 index 000000000000..75dcbcb8bc23 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireProtocolManagerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.openwire.amq; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.security.SecurityStore; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.junit.Test; +import org.mockito.Mockito; + +public class OpenWireProtocolManagerTest { + + @Test + public void testNullPrimaryOnNodeUp() throws Exception { + + ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()))); + + ClusterManager clusterManager = Mockito.mock(ClusterManager.class); + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + StorageManager storageManager = new NullStorageManager(); + Mockito.when(server.getStorageManager()).thenReturn(storageManager); + Mockito.when(server.newOperationContext()).thenReturn(storageManager.newContext(executor)); + Mockito.when(server.getClusterManager()).thenReturn(clusterManager); + Mockito.when(clusterManager.getDefaultConnection(Mockito.any())).thenReturn(null); + SecurityStore securityStore = Mockito.mock(SecurityStore.class); + Mockito.when(server.getSecurityStore()).thenReturn(securityStore); + Mockito.when(securityStore.authenticate(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(null); + ServerSession serverSession = Mockito.mock(ServerSession.class); + Mockito.when(serverSession.getName()).thenReturn("session"); + Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), Mockito.anyBoolean()); + + OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null); + openWireProtocolManager.setSecurityDomain("securityDomain"); + openWireProtocolManager.setSupportAdvisory(false); + Connection connection = Mockito.mock(Connection.class); + Mockito.doReturn(new ChannelBufferWrapper(Unpooled.buffer(1024))).when(connection).createTransportBuffer(Mockito.anyInt()); + OpenWireConnection openWireConnection = new OpenWireConnection(connection, server, openWireProtocolManager, openWireProtocolManager.wireFormat(), executor); + ConnectionInfo connectionInfo = new ConnectionInfo(new ConnectionId("1:1")); + connectionInfo.setClientId(RandomUtil.randomString()); + openWireProtocolManager.addConnection(openWireConnection, connectionInfo); + + TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null); + openWireProtocolManager.nodeUP(topologyMember, false); + executor.shutdown(); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 98397440ff95..b377da262e6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -761,16 +761,12 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) { return; } - // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection - if (allowDirectConnectionsOnly && !allowableConnections.contains(topologyMember.getPrimary().newTransportConfig(TRANSPORT_CONFIG_NAME))) { - return; - } - // FIXME required to prevent cluster connections w/o discovery group // and empty static connectors to create bridges... ulgy! if (serverLocator == null) { return; } + /*we don't create bridges to backups*/ if (topologyMember.getPrimary() == null) { if (logger.isTraceEnabled()) { @@ -779,6 +775,11 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) { return; } + // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection + if (allowDirectConnectionsOnly && !allowableConnections.contains(topologyMember.getPrimary().newTransportConfig(TRANSPORT_CONFIG_NAME))) { + return; + } + synchronized (recordsGuard) { try { MessageFlowRecord record = records.get(nodeID); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java index c64c2816dd73..7ce873383662 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java @@ -17,14 +17,22 @@ package org.apache.activemq.artemis.core.server.cluster.impl; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActivateCallback; +import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.tests.util.ServerTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.Assert; import org.junit.Test; @@ -33,9 +41,9 @@ public class ClusterConnectionImplMockTest extends ServerTestBase { - /** - * Verification for the fix https://issues.apache.org/jira/browse/ARTEMIS-1946 - */ + /** + * Verification for the fix https://issues.apache.org/jira/browse/ARTEMIS-1946 + */ @Test public void testRemvalOfLocalParameters() throws Exception { TransportConfiguration tc = new TransportConfiguration(); @@ -85,6 +93,22 @@ public void testRemvalOfLocalParameters() throws Exception { } + @Test + public void testNullPrimaryOnNodeUp() throws Exception { + TransportConfiguration tc = new TransportConfiguration(); + tc.setFactoryClassName("mock"); + tc.getParams().put(TransportConstants.LOCAL_ADDRESS_PROP_NAME, "localAddress"); + tc.getParams().put(TransportConstants.LOCAL_PORT_PROP_NAME, "localPort"); + + ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()))); + + ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, null, true, 0, 0); + + TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null); + cci.nodeUP(topologyMember, false); + executor.shutdownNow(); + } + static final class MockServer extends ActiveMQServerImpl { @Override @@ -117,4 +141,61 @@ public void run() { }; } } + + protected final class FakeNodeManager extends NodeManager { + + public FakeNodeManager(String nodeID) { + super(false); + this.setNodeID(nodeID); + } + + @Override + public void awaitPrimaryNode() { + } + + @Override + public void awaitActiveStatus() { + } + + @Override + public void startBackup() { + } + + @Override + public ActivateCallback startPrimaryNode() { + return new CleaningActivateCallback() { + }; + } + + @Override + public void pausePrimaryServer() { + } + + @Override + public void crashPrimaryServer() { + } + + @Override + public void releaseBackup() { + } + + @Override + public SimpleString readNodeId() { + return null; + } + + @Override + public boolean isAwaitingFailback() { + return false; + } + + @Override + public boolean isBackupActive() { + return false; + } + + @Override + public void interrupt() { + } + } }