From e657d00110a0d72a033ab3dbce53bb6e54432f59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Wed, 15 Mar 2017 06:06:45 +0000 Subject: [PATCH] ARTEMIS-994 Support Netty Native Epoll on Linux The following changes are made to support Epoll. Refactored SharedNioEventLoopGroup into renamed SharedEventLoopGroup to be generic (as so we can re-use for both Nio and Epoll) Add support and toggles for Epoll in NettyAcceptor and NettyConnector (with fall back to NIO if cannot load Epoll) Removal from code of PartialPooledByteBufAllocator, caused bad address when doing native, and no longer needed - see jira discussion New Connector Properties: useEpoll - toggles to use epoll or not, default true (but we failback to nio gracefully) epollRemotingThreads = same behaviour as nioRemotingThreads but for Epoll. useEpollGlobalWorkerPool = same behaviour as useNioGlobalWorkerPool but for Epoll. New Acceptor Properties: useEpoll - toggles to use epoll or not, default true (but we failback to nio gracefully) useEpollGlobalWorkerPool = same behaviour as useNioGlobalWorkerPool but for Epoll. --- .../impl/netty/DelegatingEventLoopGroup.java | 198 ++++++++++++++++++ .../remoting/impl/netty/NettyConnection.java | 2 +- .../remoting/impl/netty/NettyConnector.java | 67 ++++-- .../netty/PartialPooledByteBufAllocator.java | 138 ------------ ...opGroup.java => SharedEventLoopGroup.java} | 29 +-- .../impl/netty/TransportConstants.java | 15 ++ .../core/protocol/ProtocolHandler.java | 5 - .../remoting/impl/netty/NettyAcceptor.java | 65 ++++-- tests/extra-tests/pom.xml | 2 +- .../NettyConnectorWithHTTPUpgradeTest.java | 3 - 10 files changed, 334 insertions(+), 190 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java delete mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java rename artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/{SharedNioEventLoopGroup.java => SharedEventLoopGroup.java} (78%) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java new file mode 100644 index 00000000000..f682893f335 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; + +public class DelegatingEventLoopGroup implements EventLoopGroup { + + private final EventLoopGroup delegate; + + public DelegatingEventLoopGroup(EventLoopGroup eventLoopGroup) { + this.delegate = eventLoopGroup; + } + + @Override + public EventLoop next() { + return delegate.next(); + } + + @Override + public ChannelFuture register(Channel channel) { + return delegate.register(channel); + } + + @Override + public ChannelFuture register(ChannelPromise channelPromise) { + return delegate.register(channelPromise); + } + + @Override + @Deprecated + public ChannelFuture register(Channel channel, ChannelPromise channelPromise) { + return delegate.register(channel, channelPromise); + } + + @Override + public boolean isShuttingDown() { + return delegate.isShuttingDown(); + } + + @Override + public Future shutdownGracefully() { + return delegate.shutdownGracefully(); + } + + @Override + public Future shutdownGracefully(long l, long l1, TimeUnit timeUnit) { + return delegate.shutdownGracefully(l, l1, timeUnit); + } + + @Override + public Future terminationFuture() { + return delegate.terminationFuture(); + } + + @Override + @Deprecated + public void shutdown() { + delegate.shutdown(); + } + + @Override + @Deprecated + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public Future submit(Runnable runnable) { + return delegate.submit(runnable); + } + + @Override + public Future submit(Runnable runnable, T t) { + return delegate.submit(runnable, t); + } + + @Override + public Future submit(Callable callable) { + return delegate.submit(callable); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture schedule(Runnable runnable, long l, TimeUnit timeUnit) { + return delegate.schedule(runnable, l, timeUnit); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture schedule(Callable callable, long l, TimeUnit timeUnit) { + return delegate.schedule(callable, l, timeUnit); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture scheduleAtFixedRate(Runnable runnable, + long l, + long l1, + TimeUnit timeUnit) { + return delegate.scheduleAtFixedRate(runnable, l, l1, timeUnit); + } + + @Override + public io.netty.util.concurrent.ScheduledFuture scheduleWithFixedDelay(Runnable runnable, + long l, + long l1, + TimeUnit timeUnit) { + return delegate.scheduleWithFixedDelay(runnable, l, l1, timeUnit); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, + long timeout, + TimeUnit unit) throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, + long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + + @Override + public void forEach(Consumer action) { + delegate.forEach(action); + } + + @Override + public Spliterator spliterator() { + return delegate.spliterator(); + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 33dbf4b8b4a..23554394978 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -215,7 +215,7 @@ public ActiveMQBuffer createTransportBuffer(final int size) { @Override public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) { - return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); + return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 38fb326f3d1..cc062d30100 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -58,6 +58,9 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -218,6 +221,12 @@ public class NettyConnector extends AbstractConnector { private boolean useNioGlobalWorkerPool; + private boolean useEpoll; + + private int epollRemotingThreads; + + private boolean useEpollGlobalWorkerPool; + private ScheduledExecutorService scheduledThreadPool; private Executor closeExecutor; @@ -288,6 +297,13 @@ public NettyConnector(final Map configuration, useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL, configuration); + useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + + epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME, -1, configuration); + + useEpollGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL, configuration); + + useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration); @@ -371,22 +387,46 @@ public synchronized void start() { return; } - int threadsToUse; + // Default to number of cores * 3 + int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3; - if (nioRemotingThreads == -1) { - // Default to number of cores * 3 + if (useEpoll) { + if (Epoll.isAvailable()) { + int epollThreadsToUse; + if (epollRemotingThreads == -1) { + epollThreadsToUse = defaultThreadsToUse; + } else { + epollThreadsToUse = this.epollRemotingThreads; + } + if (useEpollGlobalWorkerPool) { + channelClazz = EpollSocketChannel.class; + group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(epollThreadsToUse, threadFactory))); + } else { + channelClazz = EpollSocketChannel.class; + group = new EpollEventLoopGroup(epollThreadsToUse); + } + logger.info("Connector using native epoll"); - threadsToUse = Runtime.getRuntime().availableProcessors() * 3; - } else { - threadsToUse = this.nioRemotingThreads; + } else { + logger.warn("Connector unable to load native epoll, will continue and load nio"); + } } - if (useNioGlobalWorkerPool) { - channelClazz = NioSocketChannel.class; - group = SharedNioEventLoopGroup.getInstance(threadsToUse); - } else { - channelClazz = NioSocketChannel.class; - group = new NioEventLoopGroup(threadsToUse); + if (channelClazz == null || group == null) { + int nioThreadsToUse; + if (nioRemotingThreads == -1) { + nioThreadsToUse = defaultThreadsToUse; + } else { + nioThreadsToUse = this.nioRemotingThreads; + } + if (useNioGlobalWorkerPool) { + channelClazz = NioSocketChannel.class; + group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(nioThreadsToUse, threadFactory))); + } else { + channelClazz = NioSocketChannel.class; + group = new NioEventLoopGroup(nioThreadsToUse); + } + logger.info("Connector using nio"); } // if we are a servlet wrap the socketChannelFactory @@ -407,7 +447,6 @@ public synchronized void start() { } bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); final SSLContext context; @@ -1054,7 +1093,7 @@ public Bootstrap getBootStrap() { } public static void clearThreadPools() { - SharedNioEventLoopGroup.forceShutdown(); + SharedEventLoopGroup.forceShutdown(); } private static ClassLoader getThisClassLoader() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java deleted file mode 100644 index 5e67952ec7e..00000000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.remoting.impl.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; - -/** - * A {@link ByteBufAllocator} which is partial pooled. Which means only direct {@link ByteBuf}s are pooled. The rest - * is unpooled. - */ -public class PartialPooledByteBufAllocator implements ByteBufAllocator { - - private static final ByteBufAllocator POOLED = PooledByteBufAllocator.DEFAULT; - private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); - - public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); - - private PartialPooledByteBufAllocator() { - } - - @Override - public ByteBuf buffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf buffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf buffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf ioBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf directBuffer() { - return POOLED.directBuffer(); - } - - @Override - public ByteBuf directBuffer(int initialCapacity) { - return POOLED.directBuffer(initialCapacity); - } - - @Override - public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { - return POOLED.directBuffer(initialCapacity, maxCapacity); - } - - @Override - public CompositeByteBuf compositeBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeHeapBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeDirectBuffer() { - return POOLED.compositeDirectBuffer(); - } - - @Override - public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { - return POOLED.compositeDirectBuffer(); - } - - @Override - public boolean isDirectBufferPooled() { - return true; - } - - @Override - public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { - return UNPOOLED.calculateNewCapacity(minNewCapacity, maxCapacity); - } -} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java similarity index 78% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java index 0750105898a..0af54de1342 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -32,41 +33,41 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -public class SharedNioEventLoopGroup extends NioEventLoopGroup { +public class SharedEventLoopGroup extends DelegatingEventLoopGroup { - private static SharedNioEventLoopGroup instance; + private static SharedEventLoopGroup instance; private final AtomicReference> shutdown = new AtomicReference<>(); - private final AtomicLong nioChannelFactoryCount = new AtomicLong(); + private final AtomicLong channelFactoryCount = new AtomicLong(); private final Promise terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise(); - private SharedNioEventLoopGroup(int numThreads, ThreadFactory factory) { - super(numThreads, factory); + private SharedEventLoopGroup(EventLoopGroup eventLoopGroup) { + super(eventLoopGroup); } public static synchronized void forceShutdown() { if (instance != null) { instance.shutdown(); - instance.nioChannelFactoryCount.set(0); + instance.channelFactoryCount.set(0); instance = null; } } - public static synchronized SharedNioEventLoopGroup getInstance(int numThreads) { + public static synchronized SharedEventLoopGroup getInstance(Function eventLoopGroupSupplier) { if (instance != null) { ScheduledFuture f = instance.shutdown.getAndSet(null); if (f != null) { f.cancel(false); } } else { - instance = new SharedNioEventLoopGroup(numThreads, AccessController.doPrivileged(new PrivilegedAction() { + instance = new SharedEventLoopGroup(eventLoopGroupSupplier.apply(AccessController.doPrivileged(new PrivilegedAction() { @Override public ThreadFactory run() { return new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); } - })); + }))); } - instance.nioChannelFactoryCount.incrementAndGet(); + instance.channelFactoryCount.incrementAndGet(); return instance; } @@ -82,13 +83,13 @@ public Future shutdownGracefully() { @Override public Future shutdownGracefully(final long l, final long l2, final TimeUnit timeUnit) { - if (nioChannelFactoryCount.decrementAndGet() == 0) { + if (channelFactoryCount.decrementAndGet() == 0) { shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() { @Override public void run() { - synchronized (SharedNioEventLoopGroup.class) { + synchronized (SharedEventLoopGroup.class) { if (shutdown.get() != null) { - Future future = SharedNioEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); + Future future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 14efb79011a..12840c1625b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -51,6 +51,10 @@ public class TransportConstants { public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "useNioGlobalWorkerPool"; + public static final String USE_EPOLL_PROP_NAME = "useEpoll"; + + public static final String USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME = "useEpollGlobalWorkerPool"; + public static final String USE_INVM_PROP_NAME = "useInvm"; public static final String ACTIVEMQ_SERVER_NAME = "activemqServerName"; @@ -113,6 +117,8 @@ public class TransportConstants { public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads"; + public static final String EPOLL_REMOTING_THREADS_PROPNAME = "epollRemotingThreads"; + public static final String BATCH_DELAY = "batchDelay"; public static final String DIRECT_DELIVER = "directDeliver"; @@ -127,6 +133,10 @@ public class TransportConstants { public static final boolean DEFAULT_USE_NIO_GLOBAL_WORKER_POOL = true; + public static final boolean DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL = true; + + public static final boolean DEFAULT_USE_EPOLL = true; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -218,6 +228,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME); @@ -237,6 +248,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME); allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY); allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER); allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); @@ -267,6 +279,8 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.SERVLET_PATH); allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); allowableConnectorKeys.add(TransportConstants.LOCAL_ADDRESS_PROP_NAME); @@ -284,6 +298,7 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); + allowableConnectorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME); allowableConnectorKeys.add(TransportConstants.BATCH_DELAY); allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 340861be55e..ca78f29155d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -185,10 +184,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t protocolManagerToUse.handshake(connection, new ChannelBufferWrapper(in)); pipeline.remove(this); - // https://issues.apache.org/jira/browse/ARTEMIS-392 - // Application servers or other components may upgrade a regular socket to Netty - // We need to be able to work normally as with anything else on Artemis - ctx.channel().config().setAllocator(PartialPooledByteBufAllocator.INSTANCE); ctx.flush(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index aaae10880a8..50faa46ade1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -44,6 +44,9 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; @@ -117,6 +120,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useInvm; + private final boolean useEpoll; + private final ProtocolHandler protocolHandler; private final String host; @@ -154,6 +159,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final int nioRemotingThreads; + private final int epollRemotingThreads; + private final ConcurrentMap connections = new ConcurrentHashMap<>(); private final Map configuration; @@ -202,6 +209,11 @@ public NettyAcceptor(final String name, sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration); nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration); + + epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME, -1, configuration); + useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + + backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -270,22 +282,48 @@ public synchronized void start() throws Exception { channelClazz = LocalServerChannel.class; eventLoopGroup = new LocalEventLoopGroup(); } else { - int threadsToUse; - - if (nioRemotingThreads == -1) { - // Default to number of cores * 3 + // Default to number of cores * 3 + int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3; + + if (useEpoll) { + if (Epoll.isAvailable()) { + int epollThreadsToUse; + if (epollRemotingThreads == -1) { + epollThreadsToUse = defaultThreadsToUse; + } else { + epollThreadsToUse = this.epollRemotingThreads; + } - threadsToUse = Runtime.getRuntime().availableProcessors() * 3; - } else { - threadsToUse = this.nioRemotingThreads; + channelClazz = EpollServerSocketChannel.class; + eventLoopGroup = new EpollEventLoopGroup(epollThreadsToUse, AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + })); + logger.info("Acceptor using native epoll"); + } else { + logger.warn("Acceptor unable to load native epoll, will continue and load nio"); + } } - channelClazz = NioServerSocketChannel.class; - eventLoopGroup = new NioEventLoopGroup(threadsToUse, AccessController.doPrivileged(new PrivilegedAction() { - @Override - public ActiveMQThreadFactory run() { - return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + + if (channelClazz == null || eventLoopGroup == null) { + int nioThreadsToUse; + if (nioRemotingThreads == -1) { + nioThreadsToUse = defaultThreadsToUse; + } else { + nioThreadsToUse = nioRemotingThreads; } - })); + + channelClazz = NioServerSocketChannel.class; + eventLoopGroup = new NioEventLoopGroup(nioThreadsToUse, AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + })); + logger.info("Acceptor using nio"); + } } bootstrap = new ServerBootstrap(); @@ -319,7 +357,6 @@ public void initChannel(Channel channel) throws Exception { bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); - bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE); serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE); diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml index fb1c78e1bb6..22536bb4459 100644 --- a/tests/extra-tests/pom.xml +++ b/tests/extra-tests/pom.xml @@ -25,7 +25,7 @@ org.apache.activemq.tests artemis-tests-pom - 2.0.0-SNAPSHOT + 2.1.0-SNAPSHOT extra-tests diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java index 0afd30cb269..0f08ecd8a88 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java @@ -26,7 +26,6 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; @@ -52,7 +51,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; -import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -216,7 +214,6 @@ private void startWebServer(int port) throws Exception { } else { context = null; } - b.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception {