From d7270d4790fa8b6a88bfdb7cced6ce0950b49b44 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Tue, 18 Jul 2017 12:26:29 +0100 Subject: [PATCH 1/2] QPID Java 7 --- apache-qpid-jms/pom.xml | 2 +- pom.xml | 4 ++-- .../qpid/jms/provider/amqp/AmqpProvider.java | 6 +++--- .../amqp/builders/AmqpResourceBuilder.java | 2 +- .../jms/provider/failover/FailoverProvider.java | 4 ++-- .../provider/failover/FailoverUriPoolTest.java | 2 +- .../transports/netty/NettyWsTransportTest.java | 15 ++++++++++----- 7 files changed, 20 insertions(+), 15 deletions(-) diff --git a/apache-qpid-jms/pom.xml b/apache-qpid-jms/pom.xml index d52ec73ab..5f30a03cc 100644 --- a/apache-qpid-jms/pom.xml +++ b/apache-qpid-jms/pom.xml @@ -83,7 +83,7 @@ src/main/assembly/src.xml - gnu + warn diff --git a/pom.xml b/pom.xml index 7cc1c0436..c9e4a1a70 100644 --- a/pom.xml +++ b/pom.xml @@ -39,8 +39,8 @@ - 1.8 - 1.8 + 1.7 + 1.7 0.19.0 diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index c40a2c29f..b464641c4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -627,7 +627,7 @@ public void run() { } @Override - public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException { + public void commit(final JmsTransactionInfo transactionInfo, final JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException { checkClosed(); serializer.execute(new Runnable() { @@ -646,7 +646,7 @@ public void run() { } @Override - public void rollback(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException { + public void rollback(final JmsTransactionInfo transactionInfo, final JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException { checkClosed(); serializer.execute(new Runnable() { @@ -813,7 +813,7 @@ public void run() { } } - public void scheduleExecuteAndPump(Runnable task) { + public void scheduleExecuteAndPump(final Runnable task) { serializer.execute(new Runnable() { @Override public void run() { diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index b244cfe95..b3b39acc3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -80,7 +80,7 @@ public void buildResource(final AsyncResult request) { // Create the resource object now resource = createResource(parent, resourceInfo, endpoint); - AmqpProvider provider = parent.getProvider(); + final AmqpProvider provider = parent.getProvider(); if (getRequestTimeout() > JmsConnectionInfo.INFINITE) { diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index b82c6f632..03e9f9ac8 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -400,7 +400,7 @@ public String toString() { } @Override - public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionInfo, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + public void commit(final JmsTransactionInfo transactionInfo, final JmsTransactionInfo nextTransactionInfo, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { checkClosed(); final FailoverRequest pending = new FailoverRequest(request, requestTimeout) { @Override @@ -430,7 +430,7 @@ protected Exception createOfflineFailureException(IOException error) { } @Override - public void rollback(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionInfo, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + public void rollback(final JmsTransactionInfo transactionInfo, final JmsTransactionInfo nextTransactionInfo, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { checkClosed(); final FailoverRequest pending = new FailoverRequest(request, requestTimeout) { @Override diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java index 0ff0942aa..d1c7d4704 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java @@ -312,7 +312,7 @@ public void testAddAllHandlesNulls() throws URISyntaxException { public void testAddAllHandlesEmpty() throws URISyntaxException { FailoverUriPool pool = new FailoverUriPool(uris, null); pool.setRandomize(false); - pool.addAll(Collections.emptyList()); + pool.addAll(Collections.emptyList()); assertEquals(uris.size(), pool.size()); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java index f7b6b6952..841f4b132 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java @@ -142,7 +142,7 @@ public void testConnectionsSendReceiveLargeDataWhenFrameSizeAllowsIt() throws Ex List transports = new ArrayList(); - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); + final Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { // The transport should allow for the size of data we sent. transport.setMaxFrameSize(FRAME_SIZE); @@ -194,7 +194,7 @@ public void testConnectionReceivesFragmentedData() throws Exception { NettyTransportListener wsListener = new NettyTransportListener(true); - Transport transport = createTransport(serverLocation, wsListener, createClientOptions); + final Transport transport = createTransport(serverLocation, wsListener, createClientOptions); try { transport.setMaxFrameSize(FRAME_SIZE); transport.connect(null); @@ -253,7 +253,7 @@ public void testConnectionsSendReceiveLargeDataFailsDueToMaxFrameSize() throws E List transports = new ArrayList(); - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); + final Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { // Transport can't receive anything bigger so it should fail the connection // when data arrives that is larger than this value. @@ -265,7 +265,12 @@ public void testConnectionsSendReceiveLargeDataFailsDueToMaxFrameSize() throws E fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); } - assertTrue("Transport should have lost connection", Wait.waitFor(() -> !transport.isConnected())); + assertTrue("Transport should have lost connection", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !transport.isConnected(); + } + })); } assertFalse(exceptions.isEmpty()); @@ -275,7 +280,7 @@ public void testConnectionsSendReceiveLargeDataFailsDueToMaxFrameSize() throws E public void testTransportDetectsConnectionDropWhenServerEnforcesMaxFrameSize() throws Exception { final int FRAME_SIZE = 1024; - ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE); + final ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE); for (int i = 0; i < FRAME_SIZE; ++i) { sendBuffer.writeByte('A'); } From 9529ba7d1d4768a7953afa47c4838d425133aea5 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Thu, 24 Aug 2017 07:57:58 +0100 Subject: [PATCH 2/2] QPIDJMS-315: Add support for Netty KQueue transport Add KQueue support Also add safer epoll and kqueue env check to protect from lib loading failure (as found in apache activemq artemis) --- pom.xml | 7 +++ qpid-jms-client/pom.xml | 5 ++ .../qpid/jms/transports/TransportOptions.java | 19 ++++++ .../qpid/jms/transports/netty/Epoll.java | 54 ++++++++++++++++ .../qpid/jms/transports/netty/KQueue.java | 54 ++++++++++++++++ .../transports/netty/NettyTcpTransport.java | 13 +++- .../java/org/apache/qpid/jms/util/Env.java | 61 +++++++++++++++++++ 7 files changed, 210 insertions(+), 3 deletions(-) create mode 100644 qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/Epoll.java create mode 100644 qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/KQueue.java create mode 100644 qpid-jms-client/src/main/java/org/apache/qpid/jms/util/Env.java diff --git a/pom.xml b/pom.xml index b254df2dc..68582a8a4 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ -Xmx2g -enableassertions ${jacoco-config} linux-x86_64 + osx-x86_64 @@ -146,6 +147,12 @@ ${netty-version} ${netty-transport-native-epoll-classifier} + + io.netty + netty-transport-native-kqueue + ${netty-version} + ${netty-transport-native-kqueue-classifier} + io.netty netty-codec-http diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml index f7fd3e62f..5b578c410 100644 --- a/qpid-jms-client/pom.xml +++ b/qpid-jms-client/pom.xml @@ -65,6 +65,11 @@ netty-transport-native-epoll ${netty-transport-native-epoll-classifier} + + io.netty + netty-transport-native-kqueue + ${netty-transport-native-kqueue-classifier} + io.netty netty-codec-http diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java index deecd6dbc..425361acd 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java @@ -31,6 +31,7 @@ public class TransportOptions implements Cloneable { public static final int DEFAULT_CONNECT_TIMEOUT = 60000; public static final int DEFAULT_TCP_PORT = 5672; public static final boolean DEFAULT_USE_EPOLL = true; + public static final boolean DEFAULT_USE_KQUEUE = true; public static final boolean DEFAULT_TRACE_BYTES = false; private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; @@ -43,6 +44,7 @@ public class TransportOptions implements Cloneable { private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; private int defaultTcpPort = DEFAULT_TCP_PORT; private boolean useEpoll = DEFAULT_USE_EPOLL; + private boolean useKQueue = DEFAULT_USE_KQUEUE; private boolean traceBytes = DEFAULT_TRACE_BYTES; /** @@ -183,6 +185,23 @@ public void setUseEpoll(boolean useEpoll) { this.useEpoll = useEpoll; } + /** + * @return the true if use of of the netty kqueue transport is used. + */ + public boolean isUseKQueue() { + return useKQueue; + } + + /** + * Determines if the netty kqueue transport can be used if available on this platform. + * + * @param useKQueue + * should use of available kqueue transport be used. + */ + public void setUseKQueue(boolean useKQueue) { + this.useKQueue = useKQueue; + } + /** * @return true if the transport should enable byte tracing */ diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/Epoll.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/Epoll.java new file mode 100644 index 000000000..5337b9fbf --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/Epoll.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.jms.transports.netty; + +import org.apache.qpid.jms.util.Env; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tells if {@code netty-transport-native-epoll} is supported. + */ +public final class Epoll { + + private static final Logger LOG = LoggerFactory.getLogger(Epoll.class); + + private static final boolean IS_EPOLL_AVAILABLE = isEpollAvailable(); + + private static boolean isEpollAvailable() { + try { + if (Env.is64BitJvm() && Env.isLinuxOs()) { + return io.netty.channel.epoll.Epoll.isAvailable(); + } else { + return false; + } + } catch (Throwable e) { + LOG.warn("Unable to check Epoll availability", e); + return false; + } + + } + + private Epoll() { + + } + + public static boolean isAvailable() { + return IS_EPOLL_AVAILABLE; + } +} diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/KQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/KQueue.java new file mode 100644 index 000000000..f4cc42736 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/KQueue.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.jms.transports.netty; + +import org.apache.qpid.jms.util.Env; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tells if {@code netty-transport-native-kqueue} is supported. + */ +public final class KQueue { + + private static final Logger LOG = LoggerFactory.getLogger(KQueue.class); + + private static final boolean IS_KQUEUE_AVAILABLE = isKQueueAvailable(); + + private static boolean isKQueueAvailable() { + try { + if (Env.is64BitJvm() && Env.isMacOs()) { + return io.netty.channel.kqueue.KQueue.isAvailable(); + } else { + return false; + } + } catch (Throwable e) { + LOG.warn("Unable to check KQueue availability", e); + return false; + } + + } + + private KQueue() { + + } + + public static boolean isAvailable() { + return IS_KQUEUE_AVAILABLE; + } +} diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java index da77be8e3..b83a2f229 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -47,9 +47,10 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LoggingHandler; @@ -139,9 +140,13 @@ public void connect(SSLContext sslContextOverride) throws IOException { sslHandler = null; } + boolean useKQueue = getTransportOptions().isUseKQueue() && KQueue.isAvailable(); boolean useEpoll = getTransportOptions().isUseEpoll() && Epoll.isAvailable(); - if (useEpoll) { + if (useKQueue) { + LOG.trace("Netty Transport using KQueue mode"); + group = new KQueueEventLoopGroup(1); + } else if (useEpoll) { LOG.trace("Netty Transport using Epoll mode"); group = new EpollEventLoopGroup(1); } else { @@ -151,7 +156,9 @@ public void connect(SSLContext sslContextOverride) throws IOException { bootstrap = new Bootstrap(); bootstrap.group(group); - if (useEpoll) { + if (useKQueue) { + bootstrap.channel(KQueueSocketChannel.class); + } else if (useEpoll) { bootstrap.channel(EpollSocketChannel.class); } else { bootstrap.channel(NioSocketChannel.class); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/Env.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/Env.java new file mode 100644 index 000000000..482360d77 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/Env.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.jms.util; + +/** + * Utility that detects various properties specific to the current runtime + * environment, such as JVM bitness and OS type. + */ +public final class Env { + + private static final String OS = System.getProperty("os.name").toLowerCase(); + private static final boolean IS_LINUX = OS.startsWith("linux"); + private static final boolean IS_MAC = OS.startsWith("mac"); + private static final boolean IS_64BIT = checkIs64bit(); + + private Env() { + + } + + public static boolean isLinuxOs() { + return IS_LINUX == true; + } + + public static boolean isMacOs() { + return IS_MAC == true; + } + + public static boolean is64BitJvm() { + return IS_64BIT; + } + + private static boolean checkIs64bit() { + //check the more used JVMs + String systemProp; + systemProp = System.getProperty("com.ibm.vm.bitmode"); + if (systemProp != null) { + return "64".equals(systemProp); + } + systemProp = System.getProperty("sun.arch.data.model"); + if (systemProp != null) { + return "64".equals(systemProp); + } + systemProp = System.getProperty("java.vm.version"); + return systemProp != null && systemProp.contains("_64"); + } +}