Skip to content

Commit

Permalink
Support IO_URING transport. Fixes eclipse-vertx#4163
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Feb 21, 2023
1 parent 6f0eb26 commit 177f7f8
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -24,6 +24,9 @@ jobs:
- os: ubuntu-latest
jdk: 8
profile: '-PtestNativeTransport'
- os: ubuntu-latest
jdk: 8
profile: '-PtestIoUringTransport'
- os: ubuntu-latest
jdk: 8
profile: '-PtestDomainSockets'
Expand Down
6 changes: 6 additions & 0 deletions README.md
Expand Up @@ -28,6 +28,12 @@ Vert.x supports native transport on BSD and Linux, to run the tests with native
> mvn test -PtestNativeTransport
```

Vert.x experimental supports native [IO_URING](https://github.com/netty/netty-incubator-transport-io_uring/) transport on Linux, to run the tests with IO_URING transport

```
> mvn test -PtestIoUringTransport
```

Vert.x supports domain sockets on Linux exclusively, to run the tests with domain sockets

```
Expand Down
33 changes: 33 additions & 0 deletions pom.xml
Expand Up @@ -53,6 +53,7 @@
<vertx.testNativeTransport>false</vertx.testNativeTransport>
<vertx.testDomainSockets>false</vertx.testDomainSockets>
<jar.manifest>${project.basedir}/src/main/resources/META-INF/MANIFEST.MF</jar.manifest>
<netty.iouring.version>0.0.14.Final</netty.iouring.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -111,6 +112,13 @@
<artifactId>netty-codec-haproxy</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<!-- TODO this should be handled on https://github.com/vert-x3/vertx-dependencies -->
<version>${netty.iouring.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
Expand Down Expand Up @@ -478,6 +486,8 @@
<classpathDependencyExclude>io.netty:netty-transport-classes-epoll</classpathDependencyExclude>
<classpathDependencyExclude>io.netty:netty-transport-native-kqueue</classpathDependencyExclude>
<classpathDependencyExclude>io.netty:netty-transport-classes-kqueue</classpathDependencyExclude>
<classpathDependencyExclude>io.netty.incubator:netty-incubator-transport-native-io_uring</classpathDependencyExclude>
<classpathDependencyExclude>io.netty.incubator:netty-incubator-transport-classes-io_uring</classpathDependencyExclude>
</classpathDependencyExcludes>
</configuration>
</execution>
Expand Down Expand Up @@ -775,6 +785,29 @@
</dependencies>
</profile>

<!-- Run tests with IO_URING transport -->

<profile>
<id>testIoUringTransport</id>
<properties>
<vertx.testNativeTransport>true</vertx.testNativeTransport>
<vertx.testDomainSockets>false</vertx.testDomainSockets>
</properties>
<dependencies>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty.iouring.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<!-- Run tests with native transport and domain sockets -->
<profile>
<id>testDomainSockets</id>
Expand Down
Expand Up @@ -270,7 +270,7 @@ private void configureHttp1OrH2C(ChannelPipeline pipeline, SslChannelProvider ss
if (options.isCompressionSupported()) {
pipeline.addLast("deflater", new HttpChunkContentCompressor(compressionOptions));
}
if (options.isSsl() || options.isCompressionSupported()) {
if (options.isSsl() || options.isCompressionSupported() || !vertx.transport().supportFileRegion()) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/net/impl/ConnectionBase.java
Expand Up @@ -14,7 +14,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedNioFile;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -409,7 +409,7 @@ protected void handleIdle(IdleStateEvent event) {
protected abstract void handleInterestedOpsChanged();

protected boolean supportsFileRegion() {
return !isSsl();
return vertx.transport().supportFileRegion() && !isSsl();
}

protected void reportBytesRead(Object msg) {
Expand Down Expand Up @@ -505,7 +505,7 @@ public final ChannelFuture sendFile(RandomAccessFile raf, long offset, long leng
ChannelPromise writeFuture = chctx.newPromise();
if (!supportsFileRegion()) {
// Cannot use zero-copy
writeToChannel(new ChunkedFile(raf, offset, length, 8192), writeFuture);
writeToChannel(new ChunkedNioFile(raf.getChannel(), offset, length, 8192), writeFuture);
} else {
// No encryption - use zero-copy.
sendFileRegion(raf, offset, length, writeFuture);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -95,7 +95,7 @@ protected void initChannel(ChannelPipeline pipeline) {
if (logEnabled) {
pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
}
if (options.isSsl()) {
if (options.isSsl() || !vertx.transport().supportFileRegion()) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/impl/NetServerImpl.java
Expand Up @@ -255,8 +255,8 @@ protected void initChannel(ChannelPipeline pipeline, boolean ssl) {
if (options.getLogActivity()) {
pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
}
if (ssl) {
// only add ChunkedWriteHandler when SSL is enabled otherwise it is not needed as FileRegion is used.
if (ssl || !vertx.transport().supportFileRegion()) {
// only add ChunkedWriteHandler when SSL is enabled or FileRegion isn't supported
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); // For large file / sendfile support
}
int idleTimeout = options.getIdleTimeout();
Expand Down
156 changes: 156 additions & 0 deletions src/main/java/io/vertx/core/net/impl/transport/IOUringTransport.java
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl.transport;

import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringChannelOption;
import io.netty.incubator.channel.uring.IOUringDatagramChannel;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.net.ClientOptionsBase;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.SocketAddressImpl;

public class IOUringTransport extends Transport {

private static volatile int pendingFastOpenRequestsThreshold = 256;

/**
* Return the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN.
* <p>
* {@see #setPendingFastOpenRequestsThreshold}
*/
public static int getPendingFastOpenRequestsThreshold() {
return pendingFastOpenRequestsThreshold;
}

/**
* Set the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN
* <p/>
* If this value goes over a certain limit the server disables all TFO connections.
*/
public static void setPendingFastOpenRequestsThreshold(int value) {
if (value < 0) {
throw new IllegalArgumentException("Invalid " + value);
}
pendingFastOpenRequestsThreshold = value;
}

@Override
public boolean supportFileRegion() {
return false;
}

@Override
public SocketAddress convert(io.vertx.core.net.SocketAddress address) {
if (address.isDomainSocket()) {
throw new IllegalArgumentException("Domain socket not supported by IOUring transport");
}
return super.convert(address);
}

@Override
public io.vertx.core.net.SocketAddress convert(SocketAddress address) {
if (address instanceof DomainSocketAddress) {
return new SocketAddressImpl(((DomainSocketAddress) address).path());
}
return super.convert(address);
}

@Override
public boolean isAvailable() {
return IOUring.isAvailable();
}

@Override
public Throwable unavailabilityCause() {
return IOUring.unavailabilityCause();
}

@Override
public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ignoredIoRatio) {
IOUringEventLoopGroup eventLoopGroup = new IOUringEventLoopGroup(nThreads, threadFactory);
return eventLoopGroup;
}

@Override
public DatagramChannel datagramChannel() {
return new IOUringDatagramChannel();
}

@Override
public DatagramChannel datagramChannel(InternetProtocolFamily family) {
return new IOUringDatagramChannel();
}

@Override
public ChannelFactory<? extends Channel> channelFactory(boolean domainSocket) {
if (domainSocket) {
throw new IllegalArgumentException();
}
return IOUringSocketChannel::new;
}

@Override
public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
if (domainSocket) {
throw new IllegalArgumentException();
}
return IOUringServerSocketChannel::new;
}

@Override
public void configure(DatagramChannel channel, DatagramSocketOptions options) {
channel.config().setOption(IOUringChannelOption.SO_REUSEPORT, options.isReusePort());
super.configure(channel, options);
}

@Override
public void configure(NetServerOptions options, boolean domainSocket, ServerBootstrap bootstrap) {
if (domainSocket) {
throw new IllegalArgumentException();
}
bootstrap.option(IOUringChannelOption.SO_REUSEPORT, options.isReusePort());
if (options.isTcpFastOpen()) {
bootstrap.option(IOUringChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0);
}
bootstrap.childOption(IOUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck());
bootstrap.childOption(IOUringChannelOption.TCP_CORK, options.isTcpCork());
super.configure(options, false, bootstrap);
}

@Override
public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap bootstrap) {
if (domainSocket) {
throw new IllegalArgumentException();
}
if (options.isTcpFastOpen()) {
bootstrap.option(IOUringChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen());
}
bootstrap.option(IOUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck());
bootstrap.option(IOUringChannelOption.TCP_CORK, options.isTcpCork());
super.configure(options, false, bootstrap);
}
}
14 changes: 14 additions & 0 deletions src/main/java/io/vertx/core/net/impl/transport/Transport.java
Expand Up @@ -49,6 +49,10 @@ public class Transport {
*/
public static Transport JDK = new Transport();

public boolean supportFileRegion() {
return true;
}

/**
* The native transport, it may be {@code null} or failed.
*/
Expand All @@ -64,6 +68,16 @@ public static Transport nativeTransport() {
} catch (Throwable ignore) {
// Jar not here
}
try {
Transport ioUring = new IOUringTransport();
if (ioUring.isAvailable()) {
return ioUring;
} else {
transport = ioUring;
}
} catch (Throwable ignore) {
// Jar not here
}
try {
Transport kqueue = new KQueueTransport();
if (kqueue.isAvailable()) {
Expand Down
6 changes: 6 additions & 0 deletions src/test/java/io/vertx/it/TransportTest.java
Expand Up @@ -50,6 +50,12 @@ public void testNoNative() {
} catch (ClassNotFoundException ignore) {
// Expected
}
try {
classLoader.loadClass("io.netty.incubator.channel.uring.IOUring");
fail("Was not expected to load IOUring class");
} catch (ClassNotFoundException ignore) {
// Expected
}
testNetServer(new VertxOptions());
assertFalse(vertx.isNativeTransportEnabled());
}
Expand Down

0 comments on commit 177f7f8

Please sign in to comment.