From 09efc8b698bd7b273563a750e15fb32979802c63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Wed, 9 Aug 2017 08:51:33 +0100 Subject: [PATCH 1/2] ARTEMIS-1335 Update Netty to 4.1.14 Update Netty to 4.1.14 --- artemis-features/src/main/resources/features.xml | 1 + .../netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java | 7 ++++++- pom.xml | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index fc9bb78cd31..cf5a2a9a240 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -36,6 +36,7 @@ mvn:io.netty/netty-codec/${netty.version} mvn:io.netty/netty-handler/${netty.version} mvn:io.netty/netty-transport-native-epoll/${netty.version} + mvn:io.netty/netty-transport-native-unix-common/${netty.version} diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java index a4346dc0c82..3ec816530ab 100644 --- a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java +++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java @@ -36,6 +36,7 @@ */ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf { + private static final byte ZERO = 0; private ByteBuffer buffer; private int arrayOffset; private byte[] array; @@ -563,7 +564,11 @@ protected SwappedByteBuf newSwappedByteBuf() { @Override public ByteBuf setZero(int index, int length) { if (hasMemoryAddress()) { - UnsafeByteBufUtil.setZero(this, addr(index), index, length); + if (length == 0) { + return this; + } + this.checkIndex(index, length); + PlatformDependent.setMemory(addr(index), length, ZERO); } else { //prefer Arrays::fill here? UnsafeByteBufUtil.setZero(array, idx(index), length); diff --git a/pom.xml b/pom.xml index 4374225fa01..15d701ee8ae 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,7 @@ 3.6.13.Final 2.4 2.8.47 - 4.1.9.Final + 4.1.14.Final 0.20.0 3.0.19.Final 1.7.21 From ae0a7cd67be48cabc813a5df8bcc95e2cbaa15e5 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Wed, 9 Aug 2017 17:43:40 +0100 Subject: [PATCH 2/2] ARTEMIS-1342: Support Netty Native KQueue on macOS Add support for KQueue for when server or client runs on macOS. This is inline with the epoll support for linux. --- .../apache/activemq/artemis/utils/Env.java | 5 ++ .../core/client/ActiveMQClientLogger.java | 5 ++ .../core/remoting/impl/netty/Epoll.java | 6 +-- .../core/remoting/impl/netty/KQueue.java | 51 +++++++++++++++++++ .../remoting/impl/netty/NettyConnector.java | 14 +++++ .../impl/netty/TransportConstants.java | 6 +++ .../src/main/resources/features.xml | 1 + .../remoting/impl/netty/NettyAcceptor.java | 17 +++++++ .../main/resources/servers/expire/broker.xml | 11 ++-- 9 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java index 94f69d310e3..cd41bef6471 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java @@ -61,6 +61,7 @@ public final class Env { private static final String OS = System.getProperty("os.name").toLowerCase(); private static final boolean IS_LINUX = OS.startsWith("linux"); + private static final boolean IS_MAC = OS.startsWith("mac"); private static final boolean IS_64BIT = checkIs64bit(); private Env() { @@ -87,6 +88,10 @@ public static boolean isLinuxOs() { return IS_LINUX == true; } + public static boolean isMacOs() { + return IS_MAC == true; + } + public static boolean is64BitJvm() { return IS_64BIT; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 6fbb911fa1d..9814d88bcd1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -536,4 +536,9 @@ public interface ActiveMQClientLogger extends BasicLogger { @Message(id = 214033, value = "Cannot resolve host ", format = Message.Format.MESSAGE_FORMAT) void unableToResolveHost(@Cause UnknownHostException e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 214034, value = "Unable to check KQueue availability ", + format = Message.Format.MESSAGE_FORMAT) + void unableToCheckKQueueAvailability(@Cause Throwable e); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java index 8553d7f8b60..8779a5d7f78 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/Epoll.java @@ -25,9 +25,9 @@ */ public final class Epoll { - private static final boolean IS_AVAILABLE_EPOLL = isIsAvailableEpoll(); + private static final boolean IS_EPOLL_AVAILABLE = isEpollAvailable(); - private static boolean isIsAvailableEpoll() { + private static boolean isEpollAvailable() { try { if (Env.is64BitJvm() && Env.isLinuxOs()) { return io.netty.channel.epoll.Epoll.isAvailable(); @@ -46,6 +46,6 @@ private Epoll() { } public static boolean isAvailable() { - return IS_AVAILABLE_EPOLL; + return IS_EPOLL_AVAILABLE; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java new file mode 100644 index 00000000000..d2adae3dfbf --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/KQueue.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.utils.Env; + +/** + * Tells if {@code netty-transport-native-kqueue} is supported. + */ +public final class KQueue { + + private static final boolean IS_KQUEUE_AVAILABLE = isKQueueAvailable(); + + private static boolean isKQueueAvailable() { + try { + if (Env.is64BitJvm() && Env.isMacOs()) { + return io.netty.channel.kqueue.KQueue.isAvailable(); + } else { + return false; + } + } catch (Throwable e) { + ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e); + return false; + } + + } + + private KQueue() { + + } + + public static boolean isAvailable() { + return IS_KQUEUE_AVAILABLE; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 8e48cf97051..aaf0b0806c3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -65,6 +65,8 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.base64.Base64; @@ -232,6 +234,8 @@ public class NettyConnector extends AbstractConnector { private boolean useEpoll; + private boolean useKQueue; + private int remotingThreads; private boolean useGlobalWorkerPool; @@ -309,6 +313,7 @@ public NettyConnector(final Map configuration, useGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME, useGlobalWorkerPool, configuration); useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); @@ -415,6 +420,15 @@ public synchronized void start() { channelClazz = EpollSocketChannel.class; logger.debug("Connector " + this + " using native epoll"); + } else if (useKQueue && KQueue.isAvailable()) { + if (useGlobalWorkerPool) { + group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); + } else { + group = new KQueueEventLoopGroup(remotingThreads); + } + + channelClazz = KQueueSocketChannel.class; + logger.debug("Connector " + this + " using native kqueue"); } else { if (useGlobalWorkerPool) { channelClazz = NioSocketChannel.class; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 5288f387009..646de800051 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -53,6 +53,8 @@ public class TransportConstants { public static final String USE_EPOLL_PROP_NAME = "useEpoll"; + public static final String USE_KQUEUE_PROP_NAME = "useKQueue"; + @Deprecated /** * @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME @@ -157,6 +159,8 @@ public class TransportConstants { public static final boolean DEFAULT_USE_EPOLL = true; + public static final boolean DEFAULT_USE_KQUEUE = true; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -255,6 +259,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); //noinspection deprecation allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); @@ -309,6 +314,7 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index cf5a2a9a240..bae4a4c8b31 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -36,6 +36,7 @@ mvn:io.netty/netty-codec/${netty.version} mvn:io.netty/netty-handler/${netty.version} mvn:io.netty/netty-transport-native-epoll/${netty.version} + mvn:io.netty/netty-transport-native-kqueue/${netty.version} mvn:io.netty/netty-transport-native-unix-common/${netty.version} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index b41fc70ea03..2477bfca636 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -55,6 +55,8 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalServerChannel; import io.netty.channel.nio.NioEventLoopGroup; @@ -96,6 +98,7 @@ public class NettyAcceptor extends AbstractAcceptor { public static String INVM_ACCEPTOR_TYPE = "IN-VM"; public static String NIO_ACCEPTOR_TYPE = "NIO"; public static String EPOLL_ACCEPTOR_TYPE = "EPOLL"; + public static String KQUEUE_ACCEPTOR_TYPE = "KQUEUE"; static { // Disable default Netty leak detection if the Netty leak detection level system properties are not in use @@ -130,6 +133,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useEpoll; + private final boolean useKQueue; + private final ProtocolHandler protocolHandler; private final String host; @@ -228,6 +233,7 @@ public NettyAcceptor(final String name, remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME, remotingThreads, configuration); useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); + useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -318,6 +324,17 @@ public ActiveMQThreadFactory run() { acceptorType = EPOLL_ACCEPTOR_TYPE; logger.debug("Acceptor using native epoll"); + } else if (useKQueue && KQueue.isAvailable()) { + channelClazz = KQueueServerSocketChannel.class; + eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); + } + })); + acceptorType = KQUEUE_ACCEPTOR_TYPE; + + logger.debug("Acceptor using native kqueue"); } else { channelClazz = NioServerSocketChannel.class; eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction() { diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml index a4176f89a31..0930296e9b6 100644 --- a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml @@ -98,23 +98,24 @@ under the License. + - tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpMinCredits=300 - tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true - tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true - tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true