diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java new file mode 100644 index 00000000000..f682893f335 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; + +public class DelegatingEventLoopGroup implements EventLoopGroup { + + private final EventLoopGroup delegate; + + public DelegatingEventLoopGroup(EventLoopGroup eventLoopGroup) { + this.delegate = eventLoopGroup; + } + + @Override + public EventLoop next() { + return delegate.next(); + } + + @Override + public ChannelFuture register(Channel channel) { + return delegate.register(channel); + } + + @Override + public ChannelFuture register(ChannelPromise channelPromise) { + return delegate.register(channelPromise); + } + + @Override + @Deprecated + public ChannelFuture register(Channel channel, ChannelPromise channelPromise) { + return delegate.register(channel, channelPromise); + } + + @Override + public boolean isShuttingDown() { + return delegate.isShuttingDown(); + } + + @Override + public Future shutdownGracefully() { + return delegate.shutdownGracefully(); + } + + @Override + public Future shutdownGracefully(long l, long l1, TimeUnit timeUnit) { + return delegate.shutdownGracefully(l, l1, timeUnit); + } + + @Override + public Future terminationFuture() { + return delegate.terminationFuture(); + } + + @Override + @Deprecated + public void shutdown() { + delegate.shutdown(); + } + + @Override + @Deprecated + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public Future submit(Runnable runnable) { + return delegate.submit(runnable); + } + + @Override + public Future submit(Runnable runnable, T t) { + return delegate.submit(runnable, t); + } + + @Override + public Future submit(Callable callable) { + return delegate.submit(callable); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture schedule(Runnable runnable, long l, TimeUnit timeUnit) { + return delegate.schedule(runnable, l, timeUnit); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture schedule(Callable callable, long l, TimeUnit timeUnit) { + return delegate.schedule(callable, l, timeUnit); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture scheduleAtFixedRate(Runnable runnable, + long l, + long l1, + TimeUnit timeUnit) { + return delegate.scheduleAtFixedRate(runnable, l, l1, timeUnit); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture scheduleWithFixedDelay(Runnable runnable, + long l, + long l1, + TimeUnit timeUnit) { + return delegate.scheduleWithFixedDelay(runnable, l, l1, timeUnit); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, + long timeout, + TimeUnit unit) throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, + long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + + @Override + public void forEach(Consumer action) { + delegate.forEach(action); + } + + @Override + public Spliterator spliterator() { + return delegate.spliterator(); + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 33dbf4b8b4a..23554394978 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -215,7 +215,7 @@ public ActiveMQBuffer createTransportBuffer(final int size) { @Override public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) { - return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); + return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 38fb326f3d1..cc062d30100 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -58,6 +58,9 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -218,6 +221,12 @@ public class NettyConnector extends AbstractConnector { private boolean useNioGlobalWorkerPool; + private boolean useEpoll; + + private int epollRemotingThreads; + + private boolean useEpollGlobalWorkerPool; + private ScheduledExecutorService scheduledThreadPool; private Executor closeExecutor; @@ -288,6 +297,13 @@ public NettyConnector(final Map configuration, useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL, configuration); + useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + + epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME, -1, configuration); + + useEpollGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL, configuration); + + useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration); @@ -371,22 +387,46 @@ public synchronized void start() { return; } - int threadsToUse; + // Default to number of cores * 3 + int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3; - if (nioRemotingThreads == -1) { - // Default to number of cores * 3 + if (useEpoll) { + if (Epoll.isAvailable()) { + int epollThreadsToUse; + if (epollRemotingThreads == -1) { + epollThreadsToUse = defaultThreadsToUse; + } else { + epollThreadsToUse = this.epollRemotingThreads; + } + if (useEpollGlobalWorkerPool) { + channelClazz = EpollSocketChannel.class; + group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(epollThreadsToUse, threadFactory))); + } else { + channelClazz = EpollSocketChannel.class; + group = new EpollEventLoopGroup(epollThreadsToUse); + } + logger.info("Connector using native epoll"); - threadsToUse = Runtime.getRuntime().availableProcessors() * 3; - } else { - threadsToUse = this.nioRemotingThreads; + } else { + logger.warn("Connector unable to load native epoll, will continue and load nio"); + } } - if (useNioGlobalWorkerPool) { - channelClazz = NioSocketChannel.class; - group = SharedNioEventLoopGroup.getInstance(threadsToUse); - } else { - channelClazz = NioSocketChannel.class; - group = new NioEventLoopGroup(threadsToUse); + if (channelClazz == null || group == null) { + int nioThreadsToUse; + if (nioRemotingThreads == -1) { + nioThreadsToUse = defaultThreadsToUse; + } else { + nioThreadsToUse = this.nioRemotingThreads; + } + if (useNioGlobalWorkerPool) { + channelClazz = NioSocketChannel.class; + group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(nioThreadsToUse, threadFactory))); + } else { + channelClazz = NioSocketChannel.class; + group = new NioEventLoopGroup(nioThreadsToUse); + } + logger.info("Connector using nio"); } // if we are a servlet wrap the socketChannelFactory @@ -407,7 +447,6 @@ public synchronized void start() { } bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); final SSLContext context; @@ -1054,7 +1093,7 @@ public Bootstrap getBootStrap() { } public static void clearThreadPools() { - SharedNioEventLoopGroup.forceShutdown(); + SharedEventLoopGroup.forceShutdown(); } private static ClassLoader getThisClassLoader() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java deleted file mode 100644 index 5e67952ec7e..00000000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.remoting.impl.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; - -/** - * A {@link ByteBufAllocator} which is partial pooled. Which means only direct {@link ByteBuf}s are pooled. The rest - * is unpooled. - */ -public class PartialPooledByteBufAllocator implements ByteBufAllocator { - - private static final ByteBufAllocator POOLED = PooledByteBufAllocator.DEFAULT; - private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); - - public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); - - private PartialPooledByteBufAllocator() { - } - - @Override - public ByteBuf buffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf buffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf buffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf ioBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf directBuffer() { - return POOLED.directBuffer(); - } - - @Override - public ByteBuf directBuffer(int initialCapacity) { - return POOLED.directBuffer(initialCapacity); - } - - @Override - public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { - return POOLED.directBuffer(initialCapacity, maxCapacity); - } - - @Override - public CompositeByteBuf compositeBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeHeapBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeDirectBuffer() { - return POOLED.compositeDirectBuffer(); - } - - @Override - public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { - return POOLED.compositeDirectBuffer(); - } - - @Override - public boolean isDirectBufferPooled() { - return true; - } - - @Override - public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { - return UNPOOLED.calculateNewCapacity(minNewCapacity, maxCapacity); - } -} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java similarity index 78% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java index 0750105898a..0af54de1342 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -32,41 +33,41 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -public class SharedNioEventLoopGroup extends NioEventLoopGroup { +public class SharedEventLoopGroup extends DelegatingEventLoopGroup { - private static SharedNioEventLoopGroup instance; + private static SharedEventLoopGroup instance; private final AtomicReference> shutdown = new AtomicReference<>(); - private final AtomicLong nioChannelFactoryCount = new AtomicLong(); + private final AtomicLong channelFactoryCount = new AtomicLong(); private final Promise terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise(); - private SharedNioEventLoopGroup(int numThreads, ThreadFactory factory) { - super(numThreads, factory); + private SharedEventLoopGroup(EventLoopGroup eventLoopGroup) { + super(eventLoopGroup); } public static synchronized void forceShutdown() { if (instance != null) { instance.shutdown(); - instance.nioChannelFactoryCount.set(0); + instance.channelFactoryCount.set(0); instance = null; } } - public static synchronized SharedNioEventLoopGroup getInstance(int numThreads) { + public static synchronized SharedEventLoopGroup getInstance(Function eventLoopGroupSupplier) { if (instance != null) { ScheduledFuture f = instance.shutdown.getAndSet(null); if (f != null) { f.cancel(false); } } else { - instance = new SharedNioEventLoopGroup(numThreads, AccessController.doPrivileged(new PrivilegedAction() { + instance = new SharedEventLoopGroup(eventLoopGroupSupplier.apply(AccessController.doPrivileged(new PrivilegedAction() { @Override public ThreadFactory run() { return new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); } - })); + }))); } - instance.nioChannelFactoryCount.incrementAndGet(); + instance.channelFactoryCount.incrementAndGet(); return instance; } @@ -82,13 +83,13 @@ public Future shutdownGracefully() { @Override public Future shutdownGracefully(final long l, final long l2, final TimeUnit timeUnit) { - if (nioChannelFactoryCount.decrementAndGet() == 0) { + if (channelFactoryCount.decrementAndGet() == 0) { shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() { @Override public void run() { - synchronized (SharedNioEventLoopGroup.class) { + synchronized (SharedEventLoopGroup.class) { if (shutdown.get() != null) { - Future future = SharedNioEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); + Future future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 14efb79011a..12840c1625b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -51,6 +51,10 @@ public class TransportConstants { public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "useNioGlobalWorkerPool"; + public static final String USE_EPOLL_PROP_NAME = "useEpoll"; + + public static final String USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME = "useEpollGlobalWorkerPool"; + public static final String USE_INVM_PROP_NAME = "useInvm"; public static final String ACTIVEMQ_SERVER_NAME = "activemqServerName"; @@ -113,6 +117,8 @@ public class TransportConstants { public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads"; + public static final String EPOLL_REMOTING_THREADS_PROPNAME = "epollRemotingThreads"; + public static final String BATCH_DELAY = "batchDelay"; public static final String DIRECT_DELIVER = "directDeliver"; @@ -127,6 +133,10 @@ public class TransportConstants { public static final boolean DEFAULT_USE_NIO_GLOBAL_WORKER_POOL = true; + public static final boolean DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL = true; + + public static final boolean DEFAULT_USE_EPOLL = true; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -218,6 +228,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME); @@ -237,6 +248,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME); allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY); allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER); allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); @@ -267,6 +279,8 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.SERVLET_PATH); allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); allowableConnectorKeys.add(TransportConstants.LOCAL_ADDRESS_PROP_NAME); @@ -284,6 +298,7 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); + allowableConnectorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME); allowableConnectorKeys.add(TransportConstants.BATCH_DELAY); allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 340861be55e..ca78f29155d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -185,10 +184,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t protocolManagerToUse.handshake(connection, new ChannelBufferWrapper(in)); pipeline.remove(this); - // https://issues.apache.org/jira/browse/ARTEMIS-392 - // Application servers or other components may upgrade a regular socket to Netty - // We need to be able to work normally as with anything else on Artemis - ctx.channel().config().setAllocator(PartialPooledByteBufAllocator.INSTANCE); ctx.flush(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index aaae10880a8..50faa46ade1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -44,6 +44,9 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; @@ -117,6 +120,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useInvm; + private final boolean useEpoll; + private final ProtocolHandler protocolHandler; private final String host; @@ -154,6 +159,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final int nioRemotingThreads; + private final int epollRemotingThreads; + private final ConcurrentMap connections = new ConcurrentHashMap<>(); private final Map configuration; @@ -202,6 +209,11 @@ public NettyAcceptor(final String name, sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration); nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration); + + epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME, -1, configuration); + useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + + backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -270,22 +282,48 @@ public synchronized void start() throws Exception { channelClazz = LocalServerChannel.class; eventLoopGroup = new LocalEventLoopGroup(); } else { - int threadsToUse; - - if (nioRemotingThreads == -1) { - // Default to number of cores * 3 + // Default to number of cores * 3 + int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3; + + if (useEpoll) { + if (Epoll.isAvailable()) { + int epollThreadsToUse; + if (epollRemotingThreads == -1) { + epollThreadsToUse = defaultThreadsToUse; + } else { + epollThreadsToUse = this.epollRemotingThreads; + } - threadsToUse = Runtime.getRuntime().availableProcessors() * 3; - } else { - threadsToUse = this.nioRemotingThreads; + channelClazz = EpollServerSocketChannel.class; + eventLoopGroup = new EpollEventLoopGroup(epollThreadsToUse, AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + })); + logger.info("Acceptor using native epoll"); + } else { + logger.warn("Acceptor unable to load native epoll, will continue and load nio"); + } } - channelClazz = NioServerSocketChannel.class; - eventLoopGroup = new NioEventLoopGroup(threadsToUse, AccessController.doPrivileged(new PrivilegedAction() { - @Override - public ActiveMQThreadFactory run() { - return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + + if (channelClazz == null || eventLoopGroup == null) { + int nioThreadsToUse; + if (nioRemotingThreads == -1) { + nioThreadsToUse = defaultThreadsToUse; + } else { + nioThreadsToUse = nioRemotingThreads; } - })); + + channelClazz = NioServerSocketChannel.class; + eventLoopGroup = new NioEventLoopGroup(nioThreadsToUse, AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + })); + logger.info("Acceptor using nio"); + } } bootstrap = new ServerBootstrap(); @@ -319,7 +357,6 @@ public void initChannel(Channel channel) throws Exception { bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); - bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE); serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE); diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml index fb1c78e1bb6..22536bb4459 100644 --- a/tests/extra-tests/pom.xml +++ b/tests/extra-tests/pom.xml @@ -25,7 +25,7 @@ org.apache.activemq.tests artemis-tests-pom - 2.0.0-SNAPSHOT + 2.1.0-SNAPSHOT extra-tests diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java index 0afd30cb269..0f08ecd8a88 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java @@ -26,7 +26,6 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; @@ -52,7 +51,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; -import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -216,7 +214,6 @@ private void startWebServer(int port) throws Exception { } else { context = null; } - b.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception {