Skip to content

Commit

Permalink
ARTEMIS-2093 NPE thrown by NettyConnector::createConnection
Browse files Browse the repository at this point in the history
Given that NettyConnector::createConnection isn't happening on the
channel's event loop, it could race with a channel close event, that
would clean the whole channel pipeline, leading to a NPE while
trying to use a configured channel handler of the pipeline.
  • Loading branch information
franz1981 committed Sep 21, 2018
1 parent bd0f114 commit 3112b4f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;

import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -604,6 +605,7 @@ public void initChannel(Channel channel) throws Exception {
protocolManager.addChannelHandlers(pipeline);

pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
logger.debugf("Added ActiveMQClientChannelHandler to Channel with id = %s ", channel.id());
}
});

Expand Down Expand Up @@ -737,6 +739,20 @@ public boolean isStarted() {

@Override
public Connection createConnection() {
return createConnection(null);
}

/**
* Create and return a connection from this connector.
* <p>
* This method must NOT throw an exception if it fails to create the connection
* (e.g. network is not available), in this case it MUST return null.<br>
* This version can be used for testing purposes.
*
* @param onConnect a callback that would be called right after {@link Bootstrap#connect()}
* @return The connection, or {@code null} if unable to create a connection (e.g. network is unavailable)
*/
public final Connection createConnection(Consumer<ChannelFuture> onConnect) {
if (channelClazz == null) {
return null;
}
Expand All @@ -758,7 +774,9 @@ public Connection createConnection() {
} else {
future = bootstrap.connect(remoteDestination);
}

if (onConnect != null) {
onConnect.accept(future);
}
future.awaitUninterruptibly();

if (future.isSuccess()) {
Expand All @@ -770,7 +788,15 @@ public Connection createConnection() {
if (handshakeFuture.isSuccess()) {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
if (channelHandler != null) {
channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
Expand Down Expand Up @@ -830,7 +856,15 @@ public Connection createConnection() {
} else {
ChannelPipeline channelPipeline = ch.pipeline();
ActiveMQChannelHandler channelHandler = channelPipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
if (channelHandler != null) {
channelHandler.active = true;
} else {
ch.close().awaitUninterruptibly();
ActiveMQClientLogger.LOGGER.errorCreatingNettyConnection(
new IllegalStateException("No ActiveMQChannelHandler has been found while connecting to " +
remoteDestination + " from Channel with id = " + ch.id()));
return null;
}
}

// No acceptor on a client connection
Expand Down
Expand Up @@ -20,11 +20,14 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
Expand Down Expand Up @@ -361,4 +364,34 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe
connector.close();
Assert.assertFalse(connector.isStarted());
}

@Test
public void testChannelHandlerRemovedWhileCreatingConnection() throws Exception {
BufferHandler handler = (connectionID, buffer) -> {
};
Map<String, Object> params = new HashMap<>();
final ExecutorService closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
try {
NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
connector.start();
final Connection connection = connector.createConnection(future -> {
future.awaitUninterruptibly();
Assert.assertTrue(future.isSuccess());
final ChannelPipeline pipeline = future.channel().pipeline();
final ActiveMQChannelHandler activeMQChannelHandler = pipeline.get(ActiveMQChannelHandler.class);
Assert.assertNotNull(activeMQChannelHandler);
pipeline.remove(activeMQChannelHandler);
Assert.assertNull(pipeline.get(ActiveMQChannelHandler.class));
});
Assert.assertNull(connection);
connector.close();
} finally {
closeExecutor.shutdownNow();
threadPool.shutdownNow();
scheduledThreadPool.shutdownNow();
}
}

}

0 comments on commit 3112b4f

Please sign in to comment.