From 177f7f81befd08c687d31679e88ebc4de8135bfd Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 4 Nov 2021 17:20:01 +0100 Subject: [PATCH] Support IO_URING transport. Fixes #4163 --- .github/workflows/ci.yml | 3 + README.md | 6 + pom.xml | 33 ++++ .../core/http/impl/HttpServerWorker.java | 2 +- .../vertx/core/net/impl/ConnectionBase.java | 6 +- .../io/vertx/core/net/impl/NetClientImpl.java | 2 +- .../io/vertx/core/net/impl/NetServerImpl.java | 4 +- .../net/impl/transport/IOUringTransport.java | 156 ++++++++++++++++++ .../core/net/impl/transport/Transport.java | 14 ++ src/test/java/io/vertx/it/TransportTest.java | 6 + 10 files changed, 225 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/vertx/core/net/impl/transport/IOUringTransport.java diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2fe909de64b..042164d8c4e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,6 +24,9 @@ jobs: - os: ubuntu-latest jdk: 8 profile: '-PtestNativeTransport' + - os: ubuntu-latest + jdk: 8 + profile: '-PtestIoUringTransport' - os: ubuntu-latest jdk: 8 profile: '-PtestDomainSockets' diff --git a/README.md b/README.md index b4e5400d85a..b107a613fb4 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,12 @@ Vert.x supports native transport on BSD and Linux, to run the tests with native > mvn test -PtestNativeTransport ``` +Vert.x experimental supports native [IO_URING](https://github.com/netty/netty-incubator-transport-io_uring/) transport on Linux, to run the tests with IO_URING transport + +``` +> mvn test -PtestIoUringTransport +``` + Vert.x supports domain sockets on Linux exclusively, to run the tests with domain sockets ``` diff --git a/pom.xml b/pom.xml index 281e4efab2d..c4d4393be6c 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ false false ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF + 0.0.14.Final @@ -111,6 +112,13 @@ netty-codec-haproxy true + + io.netty.incubator + netty-incubator-transport-native-io_uring + + ${netty.iouring.version} + true + io.netty netty-transport-native-epoll @@ -478,6 +486,8 @@ io.netty:netty-transport-classes-epoll io.netty:netty-transport-native-kqueue io.netty:netty-transport-classes-kqueue + io.netty.incubator:netty-incubator-transport-native-io_uring + io.netty.incubator:netty-incubator-transport-classes-io_uring @@ -775,6 +785,29 @@ + + + + testIoUringTransport + + true + false + + + + io.netty.incubator + netty-incubator-transport-native-io_uring + ${netty.iouring.version} + linux-x86_64 + + + io.netty + netty-transport-native-epoll + provided + + + + testDomainSockets diff --git a/src/main/java/io/vertx/core/http/impl/HttpServerWorker.java b/src/main/java/io/vertx/core/http/impl/HttpServerWorker.java index 20d27b452a8..7f5f62259ef 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpServerWorker.java +++ b/src/main/java/io/vertx/core/http/impl/HttpServerWorker.java @@ -270,7 +270,7 @@ private void configureHttp1OrH2C(ChannelPipeline pipeline, SslChannelProvider ss if (options.isCompressionSupported()) { pipeline.addLast("deflater", new HttpChunkContentCompressor(compressionOptions)); } - if (options.isSsl() || options.isCompressionSupported()) { + if (options.isSsl() || options.isCompressionSupported() || !vertx.transport().supportFileRegion()) { // only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used. pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support } diff --git a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index e1e85d2b7dd..0782d60944e 100644 --- a/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -14,7 +14,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedFile; +import io.netty.handler.stream.ChunkedNioFile; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; @@ -409,7 +409,7 @@ protected void handleIdle(IdleStateEvent event) { protected abstract void handleInterestedOpsChanged(); protected boolean supportsFileRegion() { - return !isSsl(); + return vertx.transport().supportFileRegion() && !isSsl(); } protected void reportBytesRead(Object msg) { @@ -505,7 +505,7 @@ public final ChannelFuture sendFile(RandomAccessFile raf, long offset, long leng ChannelPromise writeFuture = chctx.newPromise(); if (!supportsFileRegion()) { // Cannot use zero-copy - writeToChannel(new ChunkedFile(raf, offset, length, 8192), writeFuture); + writeToChannel(new ChunkedNioFile(raf.getChannel(), offset, length, 8192), writeFuture); } else { // No encryption - use zero-copy. sendFileRegion(raf, offset, length, writeFuture); diff --git a/src/main/java/io/vertx/core/net/impl/NetClientImpl.java b/src/main/java/io/vertx/core/net/impl/NetClientImpl.java index a256a27855e..fae0fd900b9 100644 --- a/src/main/java/io/vertx/core/net/impl/NetClientImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetClientImpl.java @@ -95,7 +95,7 @@ protected void initChannel(ChannelPipeline pipeline) { if (logEnabled) { pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat())); } - if (options.isSsl()) { + if (options.isSsl() || !vertx.transport().supportFileRegion()) { // only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used. pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support } diff --git a/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index 46b05922394..d17287583f9 100644 --- a/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -255,8 +255,8 @@ protected void initChannel(ChannelPipeline pipeline, boolean ssl) { if (options.getLogActivity()) { pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat())); } - if (ssl) { - // only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used. + if (ssl || !vertx.transport().supportFileRegion()) { + // only add ChunkedWriteHandler when SSL is enabled or FileRegion isn't supported pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support } int idleTimeout = options.getIdleTimeout(); diff --git a/src/main/java/io/vertx/core/net/impl/transport/IOUringTransport.java b/src/main/java/io/vertx/core/net/impl/transport/IOUringTransport.java new file mode 100644 index 00000000000..cce51eb72b0 --- /dev/null +++ b/src/main/java/io/vertx/core/net/impl/transport/IOUringTransport.java @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.net.impl.transport; + +import java.net.SocketAddress; +import java.util.concurrent.ThreadFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.incubator.channel.uring.IOUring; +import io.netty.incubator.channel.uring.IOUringChannelOption; +import io.netty.incubator.channel.uring.IOUringDatagramChannel; +import io.netty.incubator.channel.uring.IOUringEventLoopGroup; +import io.netty.incubator.channel.uring.IOUringServerSocketChannel; +import io.netty.incubator.channel.uring.IOUringSocketChannel; +import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.net.ClientOptionsBase; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.SocketAddressImpl; + +public class IOUringTransport extends Transport { + + private static volatile int pendingFastOpenRequestsThreshold = 256; + + /** + * Return the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN. + *

+ * {@see #setPendingFastOpenRequestsThreshold} + */ + public static int getPendingFastOpenRequestsThreshold() { + return pendingFastOpenRequestsThreshold; + } + + /** + * Set the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN + *

+ * If this value goes over a certain limit the server disables all TFO connections. + */ + public static void setPendingFastOpenRequestsThreshold(int value) { + if (value < 0) { + throw new IllegalArgumentException("Invalid " + value); + } + pendingFastOpenRequestsThreshold = value; + } + + @Override + public boolean supportFileRegion() { + return false; + } + + @Override + public SocketAddress convert(io.vertx.core.net.SocketAddress address) { + if (address.isDomainSocket()) { + throw new IllegalArgumentException("Domain socket not supported by IOUring transport"); + } + return super.convert(address); + } + + @Override + public io.vertx.core.net.SocketAddress convert(SocketAddress address) { + if (address instanceof DomainSocketAddress) { + return new SocketAddressImpl(((DomainSocketAddress) address).path()); + } + return super.convert(address); + } + + @Override + public boolean isAvailable() { + return IOUring.isAvailable(); + } + + @Override + public Throwable unavailabilityCause() { + return IOUring.unavailabilityCause(); + } + + @Override + public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ignoredIoRatio) { + IOUringEventLoopGroup eventLoopGroup = new IOUringEventLoopGroup(nThreads, threadFactory); + return eventLoopGroup; + } + + @Override + public DatagramChannel datagramChannel() { + return new IOUringDatagramChannel(); + } + + @Override + public DatagramChannel datagramChannel(InternetProtocolFamily family) { + return new IOUringDatagramChannel(); + } + + @Override + public ChannelFactory channelFactory(boolean domainSocket) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + return IOUringSocketChannel::new; + } + + @Override + public ChannelFactory serverChannelFactory(boolean domainSocket) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + return IOUringServerSocketChannel::new; + } + + @Override + public void configure(DatagramChannel channel, DatagramSocketOptions options) { + channel.config().setOption(IOUringChannelOption.SO_REUSEPORT, options.isReusePort()); + super.configure(channel, options); + } + + @Override + public void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + bootstrap.option(IOUringChannelOption.SO_REUSEPORT, options.isReusePort()); + if (options.isTcpFastOpen()) { + bootstrap.option(IOUringChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0); + } + bootstrap.childOption(IOUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); + bootstrap.childOption(IOUringChannelOption.TCP_CORK, options.isTcpCork()); + super.configure(options, false, bootstrap); + } + + @Override + public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap bootstrap) { + if (domainSocket) { + throw new IllegalArgumentException(); + } + if (options.isTcpFastOpen()) { + bootstrap.option(IOUringChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen()); + } + bootstrap.option(IOUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); + bootstrap.option(IOUringChannelOption.TCP_CORK, options.isTcpCork()); + super.configure(options, false, bootstrap); + } +} diff --git a/src/main/java/io/vertx/core/net/impl/transport/Transport.java b/src/main/java/io/vertx/core/net/impl/transport/Transport.java index 985b7642542..5feec222c52 100644 --- a/src/main/java/io/vertx/core/net/impl/transport/Transport.java +++ b/src/main/java/io/vertx/core/net/impl/transport/Transport.java @@ -49,6 +49,10 @@ public class Transport { */ public static Transport JDK = new Transport(); + public boolean supportFileRegion() { + return true; + } + /** * The native transport, it may be {@code null} or failed. */ @@ -64,6 +68,16 @@ public static Transport nativeTransport() { } catch (Throwable ignore) { // Jar not here } + try { + Transport ioUring = new IOUringTransport(); + if (ioUring.isAvailable()) { + return ioUring; + } else { + transport = ioUring; + } + } catch (Throwable ignore) { + // Jar not here + } try { Transport kqueue = new KQueueTransport(); if (kqueue.isAvailable()) { diff --git a/src/test/java/io/vertx/it/TransportTest.java b/src/test/java/io/vertx/it/TransportTest.java index 5754b270e50..87f9887d4f0 100644 --- a/src/test/java/io/vertx/it/TransportTest.java +++ b/src/test/java/io/vertx/it/TransportTest.java @@ -50,6 +50,12 @@ public void testNoNative() { } catch (ClassNotFoundException ignore) { // Expected } + try { + classLoader.loadClass("io.netty.incubator.channel.uring.IOUring"); + fail("Was not expected to load IOUring class"); + } catch (ClassNotFoundException ignore) { + // Expected + } testNetServer(new VertxOptions()); assertFalse(vertx.isNativeTransportEnabled()); }