{@link #IO_URING} is opt-in only and is not selected by AUTO; set the IO mode to + * {@code IO_URING} explicitly to use it. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index c113b72f557cf..82c229ca61117 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -17,7 +17,12 @@ package org.apache.spark.network.util; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; @@ -32,15 +37,24 @@ import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.uring.IoUring; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; +import org.apache.spark.internal.SparkLogger; +import org.apache.spark.internal.SparkLoggerFactory; + /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, - * , KQUEUE, or AUTO. + * KQUEUE, IO_URING, or AUTO. */ public class NettyUtils { + private static final SparkLogger logger = SparkLoggerFactory.getLogger(NettyUtils.class); + /** * Specifies an upper bound on the number of Netty threads that Spark requires by default. * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core @@ -56,10 +70,104 @@ public class NettyUtils { private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator = new PooledByteBufAllocator[2]; + /** + * Cached result of probing whether io_uring can actually allocate the rings Spark needs on + * this JVM. {@code null} means not yet probed; non-null is the probed value. + * + *
{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls + * work; it does not detect environments where the kernel allows io_uring but + * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in + * containers, GitHub Actions runners, and other restricted environments). The probe creates + * a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which + * is the worst case Spark allocates by default for a single event loop group -- and shuts it + * down to verify ring allocation actually succeeds. Probing with one thread is insufficient + * because some restricted environments allow a single ring to fit in memlock but not the eight + * rings Spark needs in practice. + */ + private static volatile Boolean ioUringUsable = null; + + /** + * Guards a one-shot info log line describing the io_uring features the running kernel exposes. + * Emitted the first time {@link #createEventLoop} is asked for {@link IOMode#IO_URING}, so it + * appears once per process when -- and only when -- io_uring is actually selected. Useful for + * triaging shuffle/RPC performance reports: it answers questions like "is {@code SPLICE} + * supported on this kernel?" and "what is the kernel's max pipe buffer size?" without needing + * to attach a debugger. + */ + private static final AtomicBoolean ioUringCapabilitiesLogged = new AtomicBoolean(false); + public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } + /** + * Returns true if io_uring can actually be used on the running JVM. Probes once (with the + * result cached) by attempting a real ring allocation, which catches environments where + * {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate + * the submission/completion queues (common in containers, GitHub Actions runners, and other + * restricted environments). + * + *
Used by tests that gate execution on io_uring being usable. AUTO mode does not consult + * this; io_uring is opt-in only via {@link IOMode#IO_URING}, which surfaces the underlying + * error when ring allocation fails. + */ + public static boolean isIoUringUsable() { + Boolean cached = ioUringUsable; + if (cached != null) { + return cached; + } + synchronized (NettyUtils.class) { + if (ioUringUsable != null) { + return ioUringUsable; + } + if (!JavaUtils.isLinux || !IoUring.isAvailable()) { + ioUringUsable = false; + return false; + } + MultiThreadIoEventLoopGroup probe = null; + try { + probe = new MultiThreadIoEventLoopGroup( + MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory()); + ioUringUsable = true; + } catch (Throwable t) { + logger.warn("io_uring is reported as available but allocation of " + + MAX_DEFAULT_NETTY_THREADS + " rings failed. " + + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments).", t); + ioUringUsable = false; + } finally { + if (probe != null) { + probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS); + } + } + return ioUringUsable; + } + } + + /** + * Emits a single info log line summarizing the io_uring features the running kernel exposes, + * the kernel version, and the system-wide max pipe buffer size. Called from the IO_URING branch + * of {@link #createEventLoop} so it logs at most once per process and only when io_uring is + * actually used. + * + *
The pipe-max-size matters because Netty's {@code IoUringFileRegion} routes FileRegion
+ * sends through a {@code splice(2)} pipe (file -> pipe -> socket), bounded by the pipe
+ * buffer; a small pipe forces more SQE/CQE round-trips per shuffle chunk.
+ */
+ private static void logIoUringCapabilitiesOnce() {
+ if (!ioUringCapabilitiesLogged.compareAndSet(false, true)) {
+ return;
+ }
+ String pipeMaxSize = "unknown";
+ try {
+ pipeMaxSize = Files.readString(Path.of("/proc/sys/fs/pipe-max-size")).trim();
+ } catch (IOException | RuntimeException ignored) {
+ // Not Linux, or /proc not mounted; leave as "unknown".
+ }
+ logger.info("Netty io_uring transport selected: features=[" + IoUring.featureString() +
+ "], kernel=" + System.getProperty("os.version") +
+ ", pipe-max-size=" + pipeMaxSize + " bytes");
+ }
+
/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
@@ -73,6 +181,10 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
case NIO -> NioIoHandler.newFactory();
case EPOLL -> EpollIoHandler.newFactory();
case KQUEUE -> KQueueIoHandler.newFactory();
+ case IO_URING -> {
+ logIoUringCapabilitiesOnce();
+ yield IoUringIoHandler.newFactory();
+ }
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollIoHandler.newFactory();
@@ -92,6 +204,7 @@ public static Class extends Channel> getClientChannelClass(IOMode mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
case KQUEUE -> KQueueSocketChannel.class;
+ case IO_URING -> IoUringSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollSocketChannel.class;
@@ -110,6 +223,7 @@ public static Class extends ServerChannel> getServerChannelClass(IOMode mode)
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
case KQUEUE -> KQueueServerSocketChannel.class;
+ case IO_URING -> IoUringServerSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollServerSocketChannel.class;
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index cf00d806ba835..606b49758b750 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -184,6 +184,12 @@
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" />