Skip to content

Commit

Permalink
ARTEMIS-4639 NPEs when TopologyMember's primary is null
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Feb 8, 2024
1 parent cdd341a commit 941f344
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ public OpenWireProtocolManager setOpenwireUseDuplicateDetectionOnFailover(boolea

@Override
public void nodeUP(TopologyMember member, boolean last) {
if (member.getPrimary() == null) {
if (logger.isTraceEnabled()) {
logger.trace("{} ignoring nodeUP call due to null primary; topologyMember={}, last={}", this, member, last);
}
return;
}

if (topologyMap.put(member.getNodeId(), member) == null) {
updateClientClusterInfo();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.junit.Test;
import org.mockito.Mockito;

public class OpenWireProtocolManagerTest {

@Test
public void testNullPrimaryOnNodeUp() throws Exception {

ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));

ClusterManager clusterManager = Mockito.mock(ClusterManager.class);
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
StorageManager storageManager = new NullStorageManager();
Mockito.when(server.getStorageManager()).thenReturn(storageManager);
Mockito.when(server.newOperationContext()).thenReturn(storageManager.newContext(executor));
Mockito.when(server.getClusterManager()).thenReturn(clusterManager);
Mockito.when(clusterManager.getDefaultConnection(Mockito.any())).thenReturn(null);
SecurityStore securityStore = Mockito.mock(SecurityStore.class);
Mockito.when(server.getSecurityStore()).thenReturn(securityStore);
Mockito.when(securityStore.authenticate(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(null);
ServerSession serverSession = Mockito.mock(ServerSession.class);
Mockito.when(serverSession.getName()).thenReturn("session");
Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), Mockito.anyBoolean());

OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
openWireProtocolManager.setSecurityDomain("securityDomain");
openWireProtocolManager.setSupportAdvisory(false);
Connection connection = Mockito.mock(Connection.class);
Mockito.doReturn(new ChannelBufferWrapper(Unpooled.buffer(1024))).when(connection).createTransportBuffer(Mockito.anyInt());
OpenWireConnection openWireConnection = new OpenWireConnection(connection, server, openWireProtocolManager, openWireProtocolManager.wireFormat(), executor);
ConnectionInfo connectionInfo = new ConnectionInfo(new ConnectionId("1:1"));
connectionInfo.setClientId(RandomUtil.randomString());
openWireProtocolManager.addConnection(openWireConnection, connectionInfo);

TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null);
openWireProtocolManager.nodeUP(topologyMember, false);
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -761,16 +761,12 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) {
return;
}

// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(topologyMember.getPrimary().newTransportConfig(TRANSPORT_CONFIG_NAME))) {
return;
}

// FIXME required to prevent cluster connections w/o discovery group
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null) {
return;
}

/*we don't create bridges to backups*/
if (topologyMember.getPrimary() == null) {
if (logger.isTraceEnabled()) {
Expand All @@ -779,6 +775,11 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) {
return;
}

// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(topologyMember.getPrimary().newTransportConfig(TRANSPORT_CONFIG_NAME))) {
return;
}

synchronized (recordsGuard) {
try {
MessageFlowRecord record = records.get(nodeID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@

package org.apache.activemq.artemis.core.server.cluster.impl;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.tests.util.ServerTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -33,9 +41,9 @@

public class ClusterConnectionImplMockTest extends ServerTestBase {

/**
* Verification for the fix https://issues.apache.org/jira/browse/ARTEMIS-1946
*/
/**
* Verification for the fix https://issues.apache.org/jira/browse/ARTEMIS-1946
*/
@Test
public void testRemvalOfLocalParameters() throws Exception {
TransportConfiguration tc = new TransportConfiguration();
Expand Down Expand Up @@ -85,6 +93,22 @@ public void testRemvalOfLocalParameters() throws Exception {

}

@Test
public void testNullPrimaryOnNodeUp() throws Exception {
TransportConfiguration tc = new TransportConfiguration();
tc.setFactoryClassName("mock");
tc.getParams().put(TransportConstants.LOCAL_ADDRESS_PROP_NAME, "localAddress");
tc.getParams().put(TransportConstants.LOCAL_PORT_PROP_NAME, "localPort");

ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));

ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, null, true, 0, 0);

TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null);
cci.nodeUP(topologyMember, false);
executor.shutdownNow();
}

static final class MockServer extends ActiveMQServerImpl {

@Override
Expand Down Expand Up @@ -117,4 +141,61 @@ public void run() {
};
}
}

protected final class FakeNodeManager extends NodeManager {

public FakeNodeManager(String nodeID) {
super(false);
this.setNodeID(nodeID);
}

@Override
public void awaitPrimaryNode() {
}

@Override
public void awaitActiveStatus() {
}

@Override
public void startBackup() {
}

@Override
public ActivateCallback startPrimaryNode() {
return new CleaningActivateCallback() {
};
}

@Override
public void pausePrimaryServer() {
}

@Override
public void crashPrimaryServer() {
}

@Override
public void releaseBackup() {
}

@Override
public SimpleString readNodeId() {
return null;
}

@Override
public boolean isAwaitingFailback() {
return false;
}

@Override
public boolean isBackupActive() {
return false;
}

@Override
public void interrupt() {
}
}
}

0 comments on commit 941f344

Please sign in to comment.