Skip to content

Commit

Permalink
ARTEMIS-2093 NPE thrown by NettyConnector::createConnection
Browse files Browse the repository at this point in the history
Given that NettyConnector::createConnection isn't happening on the
channel's event loop, it could race with a channel close event, that
would clean the whole channel pipeline, leading to a NPE while
trying to use a configured channel handler of the pipeline.

(cherry picked from commit 3112b4f)
  • Loading branch information
franz1981 committed Sep 21, 2018
1 parent 9a94923 commit f90afad
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 3 deletions.
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;

import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -597,6 +598,7 @@ public void initChannel(Channel channel) throws Exception {
protocolManager.addChannelHandlers(pipeline);

pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id());
}
});

Expand Down Expand Up @@ -712,6 +714,20 @@ public boolean isStarted() {

@Override
public Connection createConnection() {
return createConnection(null);
}

/**
* Create and return a connection from this connector.
* <p>
* This method must NOT throw an exception if it fails to create the connection
* (e.g. network is not available), in this case it MUST return null.<br>
* This version can be used for testing purposes.
*
* @param onConnect a callback that would be called right after {@link Bootstrap#connect()}
* @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable)
*/
public final Connection createConnection(Consumer<ChannelFuture> onConnect) {
if (channelClazz == null) {
return null;
}
Expand All @@ -733,7 +749,9 @@ public Connection createConnection() {
} else {
future = bootstrap.connect(remoteDestination);
}

if (onConnect != null) {
onConnect.accept(future);
}
future.awaitUninterruptibly();

if (future.isSuccess()) {
Expand All @@ -745,7 +763,15 @@ public Connection createConnection() {
if (handshakeFuture.isSuccess()) {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
if (channelHandler != null) {
channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
Expand Down Expand Up @@ -805,7 +831,15 @@ public Connection createConnection() {
} else {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
if (channelHandler != null) {
channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
}

// No acceptor on a client connection
Expand Down
Expand Up @@ -18,24 +18,61 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NettyConnectorTest extends ActiveMQTestBase {

private ActiveMQServer server;
private ExecutorService executorService;

@Override
@Before
public void setUp() throws Exception {
super.setUp();
executorService = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());

Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
params.put(TransportConstants.SSL_PROVIDER, TransportConstants.OPENSSL_PROVIDER);
params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "openssl-server-side-keystore.jks");
params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "openssl-server-side-truststore.jks");
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
ConfigurationImpl config = createBasicConfig().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "nettySSL"));
server = createServer(false, config);
server.start();
waitForServerToStart(server);
}

@Override
public void tearDown() throws Exception {
executorService.shutdown();
super.tearDown();
}

private ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
@Override
public void connectionException(final Object connectionID, final ActiveMQException me) {
Expand Down Expand Up @@ -197,4 +234,34 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe
connector.close();
Assert.assertFalse(connector.isStarted());
}

@Test
public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception {
BufferHandler handler = (connectionID, buffer) -> {
};
Map<String, Object> params = new HashMap<>();
final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
try {
NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
connector.start();
final Connection connection = connector.createConnection(future -> {
future.awaitUninterruptibly();
Assert.assertTrue(future.isSuccess());
final ChannelPipeline pipeline = future.channel().pipeline();
final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class);
Assert.assertNotNull(activeMQChannelHandler);
pipeline.remove(activeMQChannelHandler);
Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class));
});
Assert.assertNull(connection);
connector.close();
} finally {
closeExecutor.shutdownNow();
threadPool.shutdownNow();
scheduledThreadPool.shutdownNow();
}
}

}

0 comments on commit f90afad

Please sign in to comment.