From 0b214638e3660629c7bc2252f61a0478e0b36f33 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 20:01:25 +0800 Subject: [PATCH 01/11] [SPARK-XXXXX][CORE] Add IO_URING transport mode for Linux 5.10+ ### What changes were proposed in this pull request? Enable Netty's io_uring native transport in Spark by: 1. Removing the `netty-transport-classes-io_uring` and `netty-transport-native-io_uring` exclusions from `netty-all` in the root `pom.xml`, and adding explicit `linux-x86_64` / `linux-aarch_64` classifier dependencies under `dependencyManagement`. 2. Mirroring the same native classifier dependencies in `common/network-common/pom.xml` and `core/pom.xml`. 3. Adding `IO_URING` to `IOMode` and wiring it into `NettyUtils.createEventLoop` / `getClientChannelClass` / `getServerChannelClass`. In `AUTO`, io_uring is preferred on Linux when `IoUring.isAvailable()` reports the running kernel supports it, then EPOLL, then KQUEUE on macOS, then NIO. 4. Adding `ShuffleNettyIoUringSuite` (gated on `Utils.isLinux && IoUring.isAvailable`) so the existing shuffle coverage exercises the new mode where the platform supports it. 5. Refreshing `NettyTransportBenchmark` comments so the AUTO behavior change is visible at the call sites; the existing `NIO vs AUTO` suites automatically exercise io_uring on Linux 5.10+. 6. Regenerating `dev/deps/spark-deps-hadoop-3-hive-2.3` to include the new `netty-transport-classes-io_uring` and `netty-transport-native-io_uring` (linux-x86_64 / linux-aarch_64 / linux-riscv64) entries. ### Why are the changes needed? io_uring graduated from incubator to a first-class transport in Netty 4.2 (`io.netty.channel.uring`). Compared to EPOLL it batches I/O operations through submission/completion queues, reducing per-op syscall overhead on busy executors, and uses `IORING_OP_SPLICE` for `FileRegion` writes -- functionally equivalent to `sendfile()` but fully asynchronous. SPARK-56279 already updated `MessageEncoder` to emit the header `ByteBuf` and the bare `DefaultFileRegion` separately when the body is a `FileSegmentManagedBuffer`, which means the io_uring write path can recognize the `DefaultFileRegion` and apply splice without any additional Spark-side change. ### Does this PR introduce _any_ user-facing change? Yes. On Linux kernels 5.10+, `spark.shuffle.io.mode=AUTO` (the default) now selects io_uring instead of EPOLL when io_uring is available. Operators who want the previous behavior can set `spark.shuffle.io.mode=EPOLL` explicitly. A new explicit `IO_URING` mode is also available. ### How was this patch tested? - Manual SBT compile of `network-common`, `core`, `core/Test`, `network-shuffle`, and (with `-Pyarn`) `network-yarn` on macOS. - `ShuffleNettyIoUringSuite` mirrors `ShuffleNettyEpollSuite` and runs the existing `ShuffleSuite` cases under `IO_URING` on Linux 5.10+ via GitHub Actions. - macOS runs continue to take the KQUEUE path; Linux runs without io_uring kernel support fall back to EPOLL. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.x --- common/network-common/pom.xml | 10 +++++++++ .../org/apache/spark/network/util/IOMode.java | 8 ++++++- .../apache/spark/network/util/NettyUtils.java | 21 +++++++++++++++---- core/pom.xml | 10 +++++++++ .../org/apache/spark/ShuffleNettySuite.scala | 7 +++++++ .../network/NettyTransportBenchmark.scala | 12 ++++++----- dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 ++++ pom.xml | 20 +++++++++++------- 8 files changed, 74 insertions(+), 18 deletions(-) diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index d4cc71998604a..2aa13412bd7dc 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -51,6 +51,16 @@ netty-transport-native-epoll linux-aarch_64 + + io.netty + netty-transport-native-io_uring + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + io.netty netty-transport-native-kqueue diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 8709d30ef1be1..19d223d4a44ab 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -34,7 +34,13 @@ public enum IOMode { */ KQUEUE, /** - * Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO. + * Native io_uring via JNI, Linux only. Requires kernel 5.10+. + */ + IO_URING, + /** + * Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL + * when the running kernel supports it; otherwise EPOLL is used. On MacOS/BSD, KQUEUE is used. + * Falls back to NIO when no native transport is available. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index c113b72f557cf..e7a0849cc6b21 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -32,12 +32,16 @@ import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.uring.IoUring; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, - * , KQUEUE, or AUTO. + * KQUEUE, IO_URING, or AUTO. */ public class NettyUtils { @@ -73,8 +77,11 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case NIO -> NioIoHandler.newFactory(); case EPOLL -> EpollIoHandler.newFactory(); case KQUEUE -> KQueueIoHandler.newFactory(); + case IO_URING -> IoUringIoHandler.newFactory(); case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && IoUring.isAvailable()) { + yield IoUringIoHandler.newFactory(); + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory(); } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueIoHandler.newFactory(); @@ -92,8 +99,11 @@ public static Class getClientChannelClass(IOMode mode) { case NIO -> NioSocketChannel.class; case EPOLL -> EpollSocketChannel.class; case KQUEUE -> KQueueSocketChannel.class; + case IO_URING -> IoUringSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && IoUring.isAvailable()) { + yield IoUringSocketChannel.class; + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueSocketChannel.class; @@ -110,8 +120,11 @@ public static Class getServerChannelClass(IOMode mode) case NIO -> NioServerSocketChannel.class; case EPOLL -> EpollServerSocketChannel.class; case KQUEUE -> KQueueServerSocketChannel.class; + case IO_URING -> IoUringServerSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && IoUring.isAvailable()) { + yield IoUringServerSocketChannel.class; + } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollServerSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueServerSocketChannel.class; diff --git a/core/pom.xml b/core/pom.xml index 88f6525b8caf3..414486ce7c48d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -323,6 +323,16 @@ netty-transport-native-epoll linux-aarch_64 + + io.netty + netty-transport-native-io_uring + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + linux-aarch_64 + io.netty netty-transport-native-kqueue diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index de47c79360357..a5cd498e83da5 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -20,6 +20,8 @@ package org.apache.spark import org.scalactic.source.Position import org.scalatest.Tag +import _root_.io.netty.channel.uring.IoUring + import org.apache.spark.network.util.IOMode import org.apache.spark.util.Utils @@ -56,6 +58,11 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { override def ioMode: IOMode = IOMode.KQUEUE } +class ShuffleNettyIoUringSuite extends ShuffleNettySuite { + override def shouldRunTests: Boolean = Utils.isLinux && IoUring.isAvailable + override def ioMode: IOMode = IOMode.IO_URING +} + class ShuffleNettyAutoSuite extends ShuffleNettySuite { override def ioMode: IOMode = IOMode.AUTO } diff --git a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala index 0a3831a675c38..afed4b4fcc6e5 100644 --- a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala @@ -281,9 +281,10 @@ object NettyTransportBenchmark extends BenchmarkBase { /** * Suite 3: IOMode Comparison (NIO vs AUTO). * AUTO selects the best native transport via NettyUtils.createEventLoop - * (EPOLL on Linux, KQUEUE on macOS, NIO fallback), so comparing NIO vs AUTO - * shows the benefit of native transport without needing manual probing. - * Uses concurrent load (8 clients) to amplify transport-level differences. + * (IO_URING on Linux 5.10+, then EPOLL on Linux, KQUEUE on macOS, NIO fallback), + * so comparing NIO vs AUTO shows the benefit of native transport without needing + * manual probing. Uses concurrent load (8 clients) to amplify transport-level + * differences. */ private def ioModeComparisonBenchmark(): Unit = { val payload = new Array[Byte](MEDIUM_PAYLOAD) @@ -562,8 +563,9 @@ object NettyTransportBenchmark extends BenchmarkBase { * and fetches them using client.fetchChunk(). This exercises the DefaultFileRegion * zero-copy sendfile/splice path. * - * Compares NIO vs AUTO to verify that native transports (EPOLL/KQUEUE) use sendfile() - * for file-backed transfers. AUTO should be equal to or faster than NIO. + * Compares NIO vs AUTO to verify that native transports use the kernel zero-copy + * path for file-backed transfers (sendfile() for EPOLL/KQUEUE, splice() via the + * io_uring submission queue for IO_URING). AUTO should be equal to or faster than NIO. */ private def fileBackedShuffleBenchmark(): Unit = { val numFiles = 100 diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 7c182a16d8d78..80d7fe49e4e2c 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -217,10 +217,14 @@ netty-tcnative-boringssl-static/2.0.77.Final/osx-x86_64/netty-tcnative-boringssl netty-tcnative-boringssl-static/2.0.77.Final/windows-x86_64/netty-tcnative-boringssl-static-2.0.77.Final-windows-x86_64.jar netty-tcnative-classes/2.0.77.Final//netty-tcnative-classes-2.0.77.Final.jar netty-transport-classes-epoll/4.2.13.Final//netty-transport-classes-epoll-4.2.13.Final.jar +netty-transport-classes-io_uring/4.2.13.Final//netty-transport-classes-io_uring-4.2.13.Final.jar netty-transport-classes-kqueue/4.2.13.Final//netty-transport-classes-kqueue-4.2.13.Final.jar netty-transport-native-epoll/4.2.13.Final/linux-aarch_64/netty-transport-native-epoll-4.2.13.Final-linux-aarch_64.jar netty-transport-native-epoll/4.2.13.Final/linux-riscv64/netty-transport-native-epoll-4.2.13.Final-linux-riscv64.jar netty-transport-native-epoll/4.2.13.Final/linux-x86_64/netty-transport-native-epoll-4.2.13.Final-linux-x86_64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-aarch_64/netty-transport-native-io_uring-4.2.13.Final-linux-aarch_64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-riscv64/netty-transport-native-io_uring-4.2.13.Final-linux-riscv64.jar +netty-transport-native-io_uring/4.2.13.Final/linux-x86_64/netty-transport-native-io_uring-4.2.13.Final-linux-x86_64.jar netty-transport-native-kqueue/4.2.13.Final/osx-aarch_64/netty-transport-native-kqueue-4.2.13.Final-osx-aarch_64.jar netty-transport-native-kqueue/4.2.13.Final/osx-x86_64/netty-transport-native-kqueue-4.2.13.Final-osx-x86_64.jar netty-transport-native-unix-common/4.2.13.Final//netty-transport-native-unix-common-4.2.13.Final.jar diff --git a/pom.xml b/pom.xml index 1d2b847a2f8f6..534a3fe3fa42a 100644 --- a/pom.xml +++ b/pom.xml @@ -1004,14 +1004,6 @@ io.netty netty-transport-udt - - io.netty - netty-transport-classes-io_uring - - - io.netty - netty-transport-native-io_uring - io.netty netty-handler-ssl-ocsp @@ -1037,6 +1029,18 @@ ${netty.version} linux-aarch_64 + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-x86_64 + + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-aarch_64 + io.netty netty-transport-native-kqueue From a03118b890c15886511645a802127abec2e608d8 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 20:12:20 +0800 Subject: [PATCH 02/11] [SPARK-XXXXX][CORE][FOLLOWUP] Hide io_uring availability check behind NettyUtils helper Replace the `_root_.io.netty.channel.uring.IoUring.isAvailable` reference in `ShuffleNettyIoUringSuite` with a small `NettyUtils.isIoUringAvailable()` helper. The `_root_.` prefix was needed because `org.apache.spark.io` shadows `io.netty.*` in this file's package, but it reads as unusual. The helper keeps the Netty-specific class out of the test scope. --- .../main/java/org/apache/spark/network/util/NettyUtils.java | 5 +++++ .../src/test/scala/org/apache/spark/ShuffleNettySuite.scala | 6 ++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index e7a0849cc6b21..361122a1579ab 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -64,6 +64,11 @@ public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } + /** Returns true if the io_uring native transport is available on the running JVM. */ + public static boolean isIoUringAvailable() { + return IoUring.isAvailable(); + } + /** Creates a new ThreadFactory which prefixes each thread with the given name. */ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index a5cd498e83da5..ddad931d4150e 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -20,9 +20,7 @@ package org.apache.spark import org.scalactic.source.Position import org.scalatest.Tag -import _root_.io.netty.channel.uring.IoUring - -import org.apache.spark.network.util.IOMode +import org.apache.spark.network.util.{IOMode, NettyUtils} import org.apache.spark.util.Utils abstract class ShuffleNettySuite extends ShuffleSuite { @@ -59,7 +57,7 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { } class ShuffleNettyIoUringSuite extends ShuffleNettySuite { - override def shouldRunTests: Boolean = Utils.isLinux && IoUring.isAvailable + override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringAvailable override def ioMode: IOMode = IOMode.IO_URING } From 9e65d44928019b767a51700f2120f1c559cc65c6 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 20:16:30 +0800 Subject: [PATCH 03/11] [SPARK-XXXXX][CORE][FOLLOWUP] Shade io_uring native libs in YARN shuffle service Add antrun `move` rules so the YARN external shuffle service relocates the io_uring native libraries alongside the existing epoll/kqueue/tcnative ones. Without this, the shaded `org.sparkproject.io.netty` classes look for `liborg_sparkproject_netty_transport_native_io_uring42_*.so`, but the unshaded files are named `libnetty_transport_native_io_uring42_*.so`, so `IoUring.isAvailable()` returns `false` inside the YARN shuffle service JVM and io_uring is silently unused. Note: Netty 4.2 names the io_uring native lib `io_uring42_` (with the major+minor version suffix to allow multiple Netty versions to coexist), unlike epoll which uses the unsuffixed `epoll_`. Verified with `build/mvn -pl common/network-yarn -am -Pyarn -DskipTests package`: the resulting `spark-*-yarn-shuffle.jar` contains `META-INF/native/liborg_sparkproject_netty_transport_native_io_uring42_{x86_64,aarch_64,riscv64}.so`. --- common/network-yarn/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index cf00d806ba835..606b49758b750 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -184,6 +184,12 @@ tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" /> + + + Date: Thu, 14 May 2026 23:32:40 +0800 Subject: [PATCH 04/11] [SPARK-XXXXX][CORE][FOLLOWUP] Probe io_uring ring allocation for AUTO fallback `IoUring.isAvailable()` only verifies that the JNI library loaded and the basic syscalls work; it does not detect environments where the kernel supports io_uring but `RLIMIT_MEMLOCK` is too low to actually allocate the submission/completion queue rings. This is common in containers, GitHub Actions runners, and other restricted environments, and surfaces as: java.lang.IllegalStateException: failed to create a child event loop Caused by: java.lang.RuntimeException: failed to allocate memory for io_uring ring; try raising memlock limit (see getrlimit(RLIMIT_MEMLOCK, ...) or ulimit -l): Cannot allocate memory at io.netty.channel.uring.IoUringIoHandler.(...) After SPARK-XXXXX (the parent change) made AUTO prefer io_uring on Linux, this caused unconditional failures in such environments rather than graceful fallback to EPOLL. This change adds a one-time JVM-wide probe in `NettyUtils` that creates a single-thread `MultiThreadIoEventLoopGroup` with the io_uring handler factory and shuts it down. If construction throws, the result is cached as `false` and AUTO falls back to EPOLL. The probe is consulted by AUTO mode and by `ShuffleNettyIoUringSuite.shouldRunTests`. An explicit `IOMode.IO_URING` does not consult the probe and surfaces the underlying error so users see what's wrong. The previous `isIoUringAvailable()` helper (which just delegated to `IoUring.isAvailable()`) is replaced by `isIoUringUsable()`, which returns the probed result. --- .../org/apache/spark/network/util/IOMode.java | 6 +- .../apache/spark/network/util/NettyUtils.java | 68 +++++++++++++++++-- .../org/apache/spark/ShuffleNettySuite.scala | 2 +- 3 files changed, 67 insertions(+), 9 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 19d223d4a44ab..437b63af43c1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -39,8 +39,10 @@ public enum IOMode { IO_URING, /** * Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL - * when the running kernel supports it; otherwise EPOLL is used. On MacOS/BSD, KQUEUE is used. - * Falls back to NIO when no native transport is available. + * when the running kernel supports it AND the JVM can actually allocate an io_uring ring + * (probed once via {@link NettyUtils#isIoUringUsable()}; environments with low + * {@code RLIMIT_MEMLOCK} fall through to EPOLL). On MacOS/BSD, KQUEUE is used. Falls back to + * NIO when no native transport is available. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 361122a1579ab..32e09d7fae4ad 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -18,6 +18,7 @@ package org.apache.spark.network.util; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; @@ -39,12 +40,17 @@ import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.PlatformDependent; +import org.apache.spark.internal.SparkLogger; +import org.apache.spark.internal.SparkLoggerFactory; + /** * Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL, * KQUEUE, IO_URING, or AUTO. */ public class NettyUtils { + private static final SparkLogger logger = SparkLoggerFactory.getLogger(NettyUtils.class); + /** * Specifies an upper bound on the number of Netty threads that Spark requires by default. * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core @@ -60,13 +66,63 @@ public class NettyUtils { private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator = new PooledByteBufAllocator[2]; + /** + * Cached result of probing whether io_uring can actually allocate a ring on this JVM. + * `null` means not yet probed; non-null is the probed value. + * + *

{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls + * work; it does not detect environments where the kernel allows io_uring but + * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in + * containers, GitHub Actions runners, and other restricted environments). The probe creates + * a one-thread {@link MultiThreadIoEventLoopGroup} and shuts it down to verify ring + * allocation actually succeeds. + */ + private static volatile Boolean ioUringUsable = null; + public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } - /** Returns true if the io_uring native transport is available on the running JVM. */ - public static boolean isIoUringAvailable() { - return IoUring.isAvailable(); + /** + * Returns true if io_uring can actually be used on the running JVM. Probes once (with the + * result cached) by attempting a real ring allocation, which catches environments where + * {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate + * the submission/completion queues (common in containers, GitHub Actions runners, and other + * restricted environments). + * + *

Used by AUTO mode and by tests that gate execution on io_uring being usable. An explicit + * {@link IOMode#IO_URING} mode does not consult this and surfaces the underlying error. + */ + public static boolean isIoUringUsable() { + Boolean cached = ioUringUsable; + if (cached != null) { + return cached; + } + synchronized (NettyUtils.class) { + if (ioUringUsable != null) { + return ioUringUsable; + } + if (!JavaUtils.isLinux || !IoUring.isAvailable()) { + ioUringUsable = false; + return false; + } + MultiThreadIoEventLoopGroup probe = null; + try { + probe = new MultiThreadIoEventLoopGroup(1, IoUringIoHandler.newFactory()); + ioUringUsable = true; + } catch (Throwable t) { + logger.warn("io_uring is reported as available but ring allocation failed; " + + "AUTO will fall back to EPOLL on this JVM. " + + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " + + "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t); + ioUringUsable = false; + } finally { + if (probe != null) { + probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS); + } + } + return ioUringUsable; + } } /** Creates a new ThreadFactory which prefixes each thread with the given name. */ @@ -84,7 +140,7 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case KQUEUE -> KQueueIoHandler.newFactory(); case IO_URING -> IoUringIoHandler.newFactory(); case AUTO -> { - if (JavaUtils.isLinux && IoUring.isAvailable()) { + if (isIoUringUsable()) { yield IoUringIoHandler.newFactory(); } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory(); @@ -106,7 +162,7 @@ public static Class getClientChannelClass(IOMode mode) { case KQUEUE -> KQueueSocketChannel.class; case IO_URING -> IoUringSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && IoUring.isAvailable()) { + if (isIoUringUsable()) { yield IoUringSocketChannel.class; } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollSocketChannel.class; @@ -127,7 +183,7 @@ public static Class getServerChannelClass(IOMode mode) case KQUEUE -> KQueueServerSocketChannel.class; case IO_URING -> IoUringServerSocketChannel.class; case AUTO -> { - if (JavaUtils.isLinux && IoUring.isAvailable()) { + if (isIoUringUsable()) { yield IoUringServerSocketChannel.class; } else if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollServerSocketChannel.class; diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index ddad931d4150e..b80d3607d41b1 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -57,7 +57,7 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite { } class ShuffleNettyIoUringSuite extends ShuffleNettySuite { - override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringAvailable + override def shouldRunTests: Boolean = Utils.isLinux && NettyUtils.isIoUringUsable override def ioMode: IOMode = IOMode.IO_URING } From aecb9a16436cef910f9e4bc8f5bdcf4762515f59 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 23:50:42 +0800 Subject: [PATCH 05/11] [SPARK-XXXXX][INFRA][FOLLOWUP] Raise RLIMIT_MEMLOCK in CI so io_uring is exercised The probe-based fallback added by the previous follow-up makes Spark gracefully degrade to EPOLL when AUTO cannot allocate an io_uring ring (low `RLIMIT_MEMLOCK`, common in containers and GitHub Actions runners). Without raising the limit in CI, the io_uring code path would be silently skipped on every PR and never exercised before release. Add `sudo prlimit --pid $$ --memlock=unlimited:unlimited` (Linux-only, fail-soft via `2>/dev/null || true` so it's a no-op on macOS/Windows runners) at the top of: - `.github/workflows/build_and_test.yml` "Run tests" step, so module builds (yarn, core, network-shuffle, mllib, etc.) that hit `IOMode.AUTO` actually use io_uring on Linux 5.10+ and `ShuffleNettyIoUringSuite` runs instead of skipping via `NettyUtils.isIoUringUsable`. - `.github/workflows/benchmark.yml` "Run benchmarks" step, so `NettyTransportBenchmark`'s NIO-vs-AUTO comparison and file-backed shuffle suite measure io_uring rather than EPOLL. The fail-soft is important: stock GHA Linux runners support sudo prlimit, but stripped-down environments (e.g., custom containers used by some matrix jobs) might not, and we don't want the CI step to fail just because memlock could not be raised. The probe in `NettyUtils` will then degrade to EPOLL as designed. --- .github/workflows/benchmark.yml | 4 ++++ .github/workflows/build_and_test.yml | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 1341f2c3ffad5..56dd236808a2d 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -189,6 +189,10 @@ jobs: key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - name: Run benchmarks run: | + # Raise RLIMIT_MEMLOCK so the Netty io_uring transport (used by AUTO + # on Linux 5.10+) can allocate its submission/completion queue rings. + # Without this, NettyTransportBenchmark silently falls back to EPOLL. + sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true ./build/sbt -Pscala-${{ inputs.scala }} -Pyarn -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl Test/package # Make less noisy cp conf/log4j2.properties.template conf/log4j2.properties diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d722444379acd..519e7d01f72f8 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -406,10 +406,15 @@ jobs: run: | # Fix for TTY related issues when launching the Ammonite REPL in tests. export TERM=vt100 + # Raise RLIMIT_MEMLOCK so the Netty io_uring transport can allocate + # its submission/completion queue rings. Without this, AUTO transport + # selection silently falls back to EPOLL on the default 64KB memlock, + # leaving io_uring code paths untested. + sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL - if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then + if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /} fi export SERIAL_SBT_TESTS=1 From 645500757027fc851399a22049d0619de03fb223 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 14 May 2026 23:56:18 +0800 Subject: [PATCH 06/11] [SPARK-XXXXX][INFRA][FOLLOWUP] Restore trailing whitespace on build_and_test.yml line 417 Inadvertently stripped by the previous CI commit's surrounding edit. Pure whitespace; no behavioral change. --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 519e7d01f72f8..937dd6d66c54a 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -414,7 +414,7 @@ jobs: # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL - if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then + if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /} fi export SERIAL_SBT_TESTS=1 From aaec23eadbe6a36da58e837182bb76c59a66c6a8 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 15 May 2026 11:09:43 +0800 Subject: [PATCH 07/11] [SPARK-XXXXX][CORE][FOLLOWUP] Probe io_uring with worst-case thread count, not 1 The previous probe created a one-thread MultiThreadIoEventLoopGroup to verify io_uring ring allocation works. This is insufficient in environments (e.g., GHA Docker container jobs for pyspark) where the container's RLIMIT_MEMLOCK is just large enough for one io_uring ring but not the eight rings Spark allocates by default per event loop group. The probe would succeed, AUTO would pick io_uring, then TransportServer.init -> createEventLoop(numThreads=8) would crash with `failed to allocate memory for io_uring ring` and propagate the exception out of SparkContext construction. Probe with MAX_DEFAULT_NETTY_THREADS rings instead. This matches the worst-case allocation size Spark uses by default for a single event loop group, so any environment whose memlock can't support real Spark usage now correctly falls back to EPOLL at probe time. Users who explicitly raise spark.shuffle.io.serverThreads (or the analogous client/chunk-fetch knobs) above MAX_DEFAULT_NETTY_THREADS remain responsible for ensuring their environment can support the larger ring count; otherwise they should set spark.shuffle.io.mode to EPOLL explicitly. Observed in the pyspark CI matrix where runs sit inside a Docker container that does not honor `sudo prlimit --memlock=unlimited` from the workflow shell, leaving the JVM with the container's default memlock. --- .../apache/spark/network/util/NettyUtils.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 32e09d7fae4ad..6542df6bd6027 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -67,15 +67,18 @@ public class NettyUtils { new PooledByteBufAllocator[2]; /** - * Cached result of probing whether io_uring can actually allocate a ring on this JVM. - * `null` means not yet probed; non-null is the probed value. + * Cached result of probing whether io_uring can actually allocate the rings Spark needs on + * this JVM. `null` means not yet probed; non-null is the probed value. * *

{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls * work; it does not detect environments where the kernel allows io_uring but * {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in * containers, GitHub Actions runners, and other restricted environments). The probe creates - * a one-thread {@link MultiThreadIoEventLoopGroup} and shuts it down to verify ring - * allocation actually succeeds. + * a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which + * is the worst case Spark allocates by default for a single event loop group -- and shuts it + * down to verify ring allocation actually succeeds. Probing with one thread is insufficient + * because some restricted environments allow a single ring to fit in memlock but not the eight + * rings Spark needs in practice. */ private static volatile Boolean ioUringUsable = null; @@ -108,10 +111,12 @@ public static boolean isIoUringUsable() { } MultiThreadIoEventLoopGroup probe = null; try { - probe = new MultiThreadIoEventLoopGroup(1, IoUringIoHandler.newFactory()); + probe = new MultiThreadIoEventLoopGroup( + MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory()); ioUringUsable = true; } catch (Throwable t) { - logger.warn("io_uring is reported as available but ring allocation failed; " + + logger.warn("io_uring is reported as available but allocation of " + + MAX_DEFAULT_NETTY_THREADS + " rings failed; " + "AUTO will fall back to EPOLL on this JVM. " + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " + "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t); From b9fe8d511da0cb59f0e5e93d336c64dc86ee9443 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 20 May 2026 16:17:26 +0800 Subject: [PATCH 08/11] [SPARK-XXXXX][CORE][FOLLOWUP] Do not auto-select io_uring; keep IO_URING opt-in only io_uring has known incompatibilities with some Spark configurations -- most notably `spark.authenticate.enableSaslEncryption=true`, where `SaslEncryption.EncryptedMessage` violates the FileRegion `count()` upper-bound assumption that io_uring's chunked-FileRegion fallback relies on. AUTO-selecting io_uring would silently expose every such workload to corruption. Restore AUTO to its prior behavior (EPOLL on Linux, KQUEUE on macOS, NIO fallback). Users who want io_uring opt in explicitly via `spark.shuffle.io.mode=IO_URING`. `isIoUringUsable()` is retained as a probe helper for tests; AUTO no longer consults it. Also update IOMode.AUTO Javadoc and NettyTransportBenchmark comments to match the new behavior. --- .github/workflows/benchmark.yml | 8 +++--- .github/workflows/build_and_test.yml | 9 ++++--- .../org/apache/spark/network/util/IOMode.java | 10 ++++---- .../apache/spark/network/util/NettyUtils.java | 25 +++++++------------ .../network/NettyTransportBenchmark.scala | 12 ++++----- 5 files changed, 30 insertions(+), 34 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 56dd236808a2d..30681a11ae0f5 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -189,9 +189,11 @@ jobs: key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - name: Run benchmarks run: | - # Raise RLIMIT_MEMLOCK so the Netty io_uring transport (used by AUTO - # on Linux 5.10+) can allocate its submission/completion queue rings. - # Without this, NettyTransportBenchmark silently falls back to EPOLL. + # Raise RLIMIT_MEMLOCK so explicit IO_URING benchmark scenarios and + # tests can allocate the Netty io_uring transport's submission/completion + # queue rings on Linux 5.10+. The default 64 KiB is not enough for the + # eight rings Spark allocates by default, causing NettyUtils.isIoUringUsable + # to return false and io_uring scenarios to skip. sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true ./build/sbt -Pscala-${{ inputs.scala }} -Pyarn -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl Test/package # Make less noisy diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 937dd6d66c54a..912a82046bc6f 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -406,10 +406,11 @@ jobs: run: | # Fix for TTY related issues when launching the Ammonite REPL in tests. export TERM=vt100 - # Raise RLIMIT_MEMLOCK so the Netty io_uring transport can allocate - # its submission/completion queue rings. Without this, AUTO transport - # selection silently falls back to EPOLL on the default 64KB memlock, - # leaving io_uring code paths untested. + # Raise RLIMIT_MEMLOCK so the explicit IO_URING test suites + # (e.g. ShuffleNettyIoUringSuite) can allocate the Netty io_uring transport's + # submission/completion queue rings. The default 64 KiB is not enough for + # the eight rings Spark allocates by default, causing + # NettyUtils.isIoUringUsable to return false and io_uring suites to skip. sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true # Hive "other tests" test needs larger metaspace size based on experiment. if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 437b63af43c1e..8b25f1b0928d4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -38,11 +38,11 @@ public enum IOMode { */ IO_URING, /** - * Prefer to use a native transport when available. On Linux, io_uring is preferred over EPOLL - * when the running kernel supports it AND the JVM can actually allocate an io_uring ring - * (probed once via {@link NettyUtils#isIoUringUsable()}; environments with low - * {@code RLIMIT_MEMLOCK} fall through to EPOLL). On MacOS/BSD, KQUEUE is used. Falls back to - * NIO when no native transport is available. + * Prefer to use a native transport when available. On Linux, EPOLL is used; on MacOS/BSD, + * KQUEUE is used. Falls back to NIO when no native transport is available. + * + *

{@link #IO_URING} is opt-in only and is not selected by AUTO; set the IO mode to + * {@code IO_URING} explicitly to use it. */ AUTO } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 6542df6bd6027..d776bd5cf577d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -68,7 +68,7 @@ public class NettyUtils { /** * Cached result of probing whether io_uring can actually allocate the rings Spark needs on - * this JVM. `null` means not yet probed; non-null is the probed value. + * this JVM. {@code null} means not yet probed; non-null is the probed value. * *

{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls * work; it does not detect environments where the kernel allows io_uring but @@ -93,8 +93,9 @@ public static long freeDirectMemory() { * the submission/completion queues (common in containers, GitHub Actions runners, and other * restricted environments). * - *

Used by AUTO mode and by tests that gate execution on io_uring being usable. An explicit - * {@link IOMode#IO_URING} mode does not consult this and surfaces the underlying error. + *

Used by tests that gate execution on io_uring being usable. AUTO mode does not consult + * this; io_uring is opt-in only via {@link IOMode#IO_URING}, which surfaces the underlying + * error when ring allocation fails. */ public static boolean isIoUringUsable() { Boolean cached = ioUringUsable; @@ -116,10 +117,8 @@ public static boolean isIoUringUsable() { ioUringUsable = true; } catch (Throwable t) { logger.warn("io_uring is reported as available but allocation of " + - MAX_DEFAULT_NETTY_THREADS + " rings failed; " + - "AUTO will fall back to EPOLL on this JVM. " + - "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments). " + - "To force io_uring, set spark.shuffle.io.mode=IO_URING explicitly.", t); + MAX_DEFAULT_NETTY_THREADS + " rings failed. " + + "Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments).", t); ioUringUsable = false; } finally { if (probe != null) { @@ -145,9 +144,7 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case KQUEUE -> KQueueIoHandler.newFactory(); case IO_URING -> IoUringIoHandler.newFactory(); case AUTO -> { - if (isIoUringUsable()) { - yield IoUringIoHandler.newFactory(); - } else if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory(); } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueIoHandler.newFactory(); @@ -167,9 +164,7 @@ public static Class getClientChannelClass(IOMode mode) { case KQUEUE -> KQueueSocketChannel.class; case IO_URING -> IoUringSocketChannel.class; case AUTO -> { - if (isIoUringUsable()) { - yield IoUringSocketChannel.class; - } else if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueSocketChannel.class; @@ -188,9 +183,7 @@ public static Class getServerChannelClass(IOMode mode) case KQUEUE -> KQueueServerSocketChannel.class; case IO_URING -> IoUringServerSocketChannel.class; case AUTO -> { - if (isIoUringUsable()) { - yield IoUringServerSocketChannel.class; - } else if (JavaUtils.isLinux && Epoll.isAvailable()) { + if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollServerSocketChannel.class; } else if (JavaUtils.isMac && KQueue.isAvailable()) { yield KQueueServerSocketChannel.class; diff --git a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala index afed4b4fcc6e5..e95472bddc7a6 100644 --- a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala @@ -281,10 +281,10 @@ object NettyTransportBenchmark extends BenchmarkBase { /** * Suite 3: IOMode Comparison (NIO vs AUTO). * AUTO selects the best native transport via NettyUtils.createEventLoop - * (IO_URING on Linux 5.10+, then EPOLL on Linux, KQUEUE on macOS, NIO fallback), - * so comparing NIO vs AUTO shows the benefit of native transport without needing - * manual probing. Uses concurrent load (8 clients) to amplify transport-level - * differences. + * (EPOLL on Linux, KQUEUE on macOS, NIO fallback), so comparing NIO vs AUTO shows + * the benefit of native transport without needing manual probing. Uses concurrent + * load (8 clients) to amplify transport-level differences. IO_URING is opt-in only + * and is not exercised here; see the dedicated IO_URING benchmark if added. */ private def ioModeComparisonBenchmark(): Unit = { val payload = new Array[Byte](MEDIUM_PAYLOAD) @@ -564,8 +564,8 @@ object NettyTransportBenchmark extends BenchmarkBase { * zero-copy sendfile/splice path. * * Compares NIO vs AUTO to verify that native transports use the kernel zero-copy - * path for file-backed transfers (sendfile() for EPOLL/KQUEUE, splice() via the - * io_uring submission queue for IO_URING). AUTO should be equal to or faster than NIO. + * path for file-backed transfers (sendfile() for EPOLL/KQUEUE). AUTO should be equal + * to or faster than NIO. */ private def fileBackedShuffleBenchmark(): Unit = { val numFiles = 100 From bc0be074b117cc8f8a32dcfc0a03a53c4d988c05 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 20 May 2026 16:56:10 +0800 Subject: [PATCH 09/11] [SPARK-XXXXX][CORE][FOLLOWUP] Add IO_URING to NettyTransportBenchmark mode comparisons Extend Suite 3 (IOMode Comparison) and Suite 8 (File-Backed Shuffle Block Fetch) to also run an IO_URING case when NettyUtils.isIoUringUsable reports true. The IO_URING case skips on macOS, on Linux without io_uring support, and on CI runners with low RLIMIT_MEMLOCK; recorded results may therefore omit it, and io_uring numbers should be captured by running locally on Linux 5.10+ with prlimit --memlock=unlimited. --- .../network/NettyTransportBenchmark.scala | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala index e95472bddc7a6..40f0e8cddc618 100644 --- a/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/network/NettyTransportBenchmark.scala @@ -279,12 +279,14 @@ object NettyTransportBenchmark extends BenchmarkBase { } /** - * Suite 3: IOMode Comparison (NIO vs AUTO). + * Suite 3: IOMode Comparison (NIO vs AUTO, plus IO_URING when usable). * AUTO selects the best native transport via NettyUtils.createEventLoop - * (EPOLL on Linux, KQUEUE on macOS, NIO fallback), so comparing NIO vs AUTO shows - * the benefit of native transport without needing manual probing. Uses concurrent - * load (8 clients) to amplify transport-level differences. IO_URING is opt-in only - * and is not exercised here; see the dedicated IO_URING benchmark if added. + * (EPOLL on Linux, KQUEUE on macOS, NIO fallback). IO_URING is opt-in only and is + * appended to the comparison when {@code NettyUtils.isIoUringUsable} reports true + * -- on CI runners with low {@code RLIMIT_MEMLOCK} the IO_URING case will skip, + * so recorded results may omit it. Run locally on Linux 5.10+ with + * {@code prlimit --memlock=unlimited} to capture the io_uring numbers. + * Uses concurrent load (8 clients) to amplify transport-level differences. */ private def ioModeComparisonBenchmark(): Unit = { val payload = new Array[Byte](MEDIUM_PAYLOAD) @@ -298,7 +300,9 @@ object NettyTransportBenchmark extends BenchmarkBase { minNumIters = 3, output = output) - Seq("NIO", "AUTO").foreach { mode => + val modes = Seq("NIO", "AUTO") ++ + (if (NettyUtils.isIoUringUsable) Seq("IO_URING") else Seq.empty) + modes.foreach { mode => benchmark.addTimerCase(s"$mode ($numClients clients)", numIters = 3) { timer => withTransport(mode) { (clientFactory, port) => val messagesPerClient = (totalMessages / numClients).toInt @@ -563,9 +567,11 @@ object NettyTransportBenchmark extends BenchmarkBase { * and fetches them using client.fetchChunk(). This exercises the DefaultFileRegion * zero-copy sendfile/splice path. * - * Compares NIO vs AUTO to verify that native transports use the kernel zero-copy - * path for file-backed transfers (sendfile() for EPOLL/KQUEUE). AUTO should be equal - * to or faster than NIO. + * Compares NIO vs AUTO -- and IO_URING when {@code NettyUtils.isIoUringUsable} + * reports true -- to verify that native transports use the kernel zero-copy path + * for file-backed transfers (sendfile() for EPOLL/KQUEUE, splice() for io_uring on + * supported kernels). AUTO should be equal to or faster than NIO. On CI runners + * with low {@code RLIMIT_MEMLOCK} the IO_URING case will skip. */ private def fileBackedShuffleBenchmark(): Unit = { val numFiles = 100 @@ -593,14 +599,16 @@ object NettyTransportBenchmark extends BenchmarkBase { } try { - runBenchmark(s"File-Backed Shuffle Block Fetch (NIO vs AUTO, ${numFiles}x16MB)") { + runBenchmark(s"File-Backed Shuffle Block Fetch (${numFiles}x16MB)") { val benchmark = new Benchmark( "File-Backed Shuffle Fetch", numFiles.toLong, minNumIters = 3, output = output) - Seq("NIO", "AUTO").foreach { mode => + val modes = Seq("NIO", "AUTO") ++ + (if (NettyUtils.isIoUringUsable) Seq("IO_URING") else Seq.empty) + modes.foreach { mode => benchmark.addTimerCase(s"$mode, sequential fetch", numIters = 3) { timer => val conf = createConf(mode) val streamManager = createFileStreamManager(conf, files) From f0e1f1574a063613a908578a7b8c227c998711c7 Mon Sep 17 00:00:00 2001 From: LuciferYang Date: Wed, 20 May 2026 11:10:57 +0000 Subject: [PATCH 10/11] Benchmark results for org.apache.spark.network.NettyTransportBenchmark (JDK 17, Scala 2.13, split 1 of 1) --- .../NettyTransportBenchmark-results.txt | 103 +++++++++--------- 1 file changed, 53 insertions(+), 50 deletions(-) diff --git a/core/benchmarks/NettyTransportBenchmark-results.txt b/core/benchmarks/NettyTransportBenchmark-results.txt index 2b1d64eb19866..59b15dd8acb51 100644 --- a/core/benchmarks/NettyTransportBenchmark-results.txt +++ b/core/benchmarks/NettyTransportBenchmark-results.txt @@ -2,136 +2,139 @@ RPC Round-Trip Latency - 1 KB payload (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz RPC Latency (1 KB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -1 KB payload 444 472 20 0.0 88867.8 1.0X +1 KB payload 308 313 7 0.0 61570.4 1.0X ================================================================================================ RPC Round-Trip Latency - 64 KB payload (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz RPC Latency (64 KB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -64 KB payload 715 742 25 0.0 143088.4 1.0X +64 KB payload 427 435 5 0.0 85367.2 1.0X ================================================================================================ RPC Round-Trip Latency - 1 MB payload (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz RPC Latency (1 MB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -1 MB payload 875 902 23 0.0 874524.5 1.0X +1 MB payload 474 481 10 0.0 474050.5 1.0X ================================================================================================ RPC Round-Trip Latency - 16 MB payload (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz RPC Latency (16 MB): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -16 MB payload 1327 1390 61 0.0 13268791.8 1.0X +16 MB payload 781 790 11 0.0 7808193.5 1.0X ================================================================================================ Concurrent RPC Throughput (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz Concurrent RPC Throughput: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -1 client(s) 1823 1969 127 0.0 91128.9 1.0X -4 client(s) 768 783 20 0.0 38389.3 2.4X -8 client(s) 512 536 23 0.0 25603.3 3.6X -16 client(s) 459 464 5 0.0 22970.0 4.0X +1 client(s) 1203 1230 37 0.0 60140.4 1.0X +4 client(s) 398 406 8 0.1 19877.3 3.0X +8 client(s) 287 293 7 0.1 14354.1 4.2X +16 client(s) 244 257 11 0.1 12214.6 4.9X ================================================================================================ IOMode Comparison (Concurrent Throughput) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz IOMode Comparison: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NIO (8 clients) 792 840 41 0.0 39614.4 1.0X -AUTO (8 clients) 867 885 19 0.0 43371.9 0.9X +NIO (8 clients) 511 540 28 0.0 25558.0 1.0X +AUTO (8 clients) 490 511 21 0.0 24493.3 1.0X +IO_URING (8 clients) 524 528 6 0.0 26186.0 1.0X ================================================================================================ Server Thread Scaling (IOMode=AUTO, 16 clients) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz Server Thread Scaling: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -2 server threads 496 502 10 0.0 24789.3 1.0X -4 server threads 405 424 32 0.0 20234.7 1.2X -8 server threads 421 450 27 0.0 21025.0 1.2X -16 server threads 443 458 26 0.0 22142.6 1.1X -32 server threads 455 515 52 0.0 22727.6 1.1X +2 server threads 270 278 10 0.1 13476.7 1.0X +4 server threads 254 258 4 0.1 12692.2 1.1X +8 server threads 273 299 22 0.1 13659.0 1.0X +16 server threads 275 283 9 0.1 13763.1 1.0X +32 server threads 274 288 14 0.1 13707.7 1.0X ================================================================================================ Multi-Connection Per Peer (IOMode=AUTO, 1MB payload) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz Multi-Connection Throughput: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -1 conn(s), 4 threads 1702 1871 152 0.0 340467.9 1.0X -2 conn(s), 4 threads 1278 1540 227 0.0 255638.3 1.3X -4 conn(s), 4 threads 1736 1768 36 0.0 347180.2 1.0X +1 conn(s), 4 threads 1617 1707 136 0.0 323385.0 1.0X +2 conn(s), 4 threads 1306 1317 13 0.0 261233.6 1.2X +4 conn(s), 4 threads 1126 1140 12 0.0 225198.0 1.4X ================================================================================================ Async Write Pressure (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz Async Write Throughput: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -1 KB async burst 72 137 112 0.1 14336.3 1.0X -64 KB async burst 147 161 13 0.0 29489.6 0.5X -1 MB async burst 1412 1479 59 0.0 282361.9 0.1X +1 KB async burst 29 35 11 0.2 5742.0 1.0X +64 KB async burst 137 146 10 0.0 27408.7 0.2X +1 MB async burst 1630 1635 5 0.0 325949.8 0.0X ================================================================================================ Large Block Transfer Throughput (IOMode=AUTO) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz 16 MB Block Transfer: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sequential sends 670 895 358 0.0 6695791.0 1.0X -4-thread parallel sends 436 451 17 0.0 4358412.5 1.5X +Sequential sends 764 817 60 0.0 7644943.9 1.0X +4-thread parallel sends 472 479 6 0.0 4720135.4 1.6X ================================================================================================ -File-Backed Shuffle Block Fetch (NIO vs AUTO, 100x16MB) +File-Backed Shuffle Block Fetch (100x16MB) ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.17.0-1008-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure +Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz File-Backed Shuffle Fetch: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -NIO, sequential fetch 384 386 3 0.0 3844405.1 1.0X -NIO, parallel fetch (4 clients) 209 217 9 0.0 2091358.0 1.8X -AUTO, sequential fetch 362 372 14 0.0 3615964.2 1.1X -AUTO, parallel fetch (4 clients) 202 209 6 0.0 2022433.8 1.9X +NIO, sequential fetch 353 363 9 0.0 3528829.7 1.0X +NIO, parallel fetch (4 clients) 207 241 59 0.0 2074918.0 1.7X +AUTO, sequential fetch 333 361 39 0.0 3330254.1 1.1X +AUTO, parallel fetch (4 clients) 183 192 8 0.0 1826653.6 1.9X +IO_URING, sequential fetch 1382 1419 36 0.0 13817436.0 0.3X +IO_URING, parallel fetch (4 clients) 324 354 27 0.0 3238954.3 1.1X From da25ffed9c0440189c00f684d9031875832b2e01 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 May 2026 11:06:39 +0800 Subject: [PATCH 11/11] [SPARK-XXXXX][CORE][FOLLOWUP] Log io_uring capabilities and per-module IO mode for diagnostics Adds two one-shot info logs to help triage io_uring vs EPOLL performance reports, particularly the shuffle-fetch regression visible in the NettyTransportBenchmark results committed in f0e1f1574a0: * TransportContext ctor logs `(module, ioMode, role)` once per context so it is visible which Spark module ends up on IO_URING vs EPOLL vs NIO. * NettyUtils.createEventLoop logs IoUring.featureString(), the kernel version, and /proc/sys/fs/pipe-max-size the first time IO_URING is selected in a process. The pipe-max-size matters because Netty's IoUringFileRegion routes FileRegion sends through a splice(2) pipe (file -> pipe -> socket); a small pipe forces more SQE/CQE round-trips per shuffle chunk. No behavior change. --- .../spark/network/TransportContext.java | 5 +++ .../apache/spark/network/util/NettyUtils.java | 44 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index e8ce6840e3fc3..e13bd2257ad0b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -137,6 +137,11 @@ public TransportContext( this.closeIdleConnections = closeIdleConnections; this.sslFactory = createSslFactory(); + logger.info("TransportContext initialized: module=" + + (conf.getModuleName() == null ? "" : conf.getModuleName()) + + ", ioMode=" + conf.ioMode() + + ", role=" + (isClientOnly ? "client-only" : "server+client")); + if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") && !isClientOnly && conf.separateChunkFetchRequest()) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index d776bd5cf577d..82c229ca61117 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -17,8 +17,12 @@ package org.apache.spark.network.util; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; @@ -82,6 +86,16 @@ public class NettyUtils { */ private static volatile Boolean ioUringUsable = null; + /** + * Guards a one-shot info log line describing the io_uring features the running kernel exposes. + * Emitted the first time {@link #createEventLoop} is asked for {@link IOMode#IO_URING}, so it + * appears once per process when -- and only when -- io_uring is actually selected. Useful for + * triaging shuffle/RPC performance reports: it answers questions like "is {@code SPLICE} + * supported on this kernel?" and "what is the kernel's max pipe buffer size?" without needing + * to attach a debugger. + */ + private static final AtomicBoolean ioUringCapabilitiesLogged = new AtomicBoolean(false); + public static long freeDirectMemory() { return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory(); } @@ -129,6 +143,31 @@ public static boolean isIoUringUsable() { } } + /** + * Emits a single info log line summarizing the io_uring features the running kernel exposes, + * the kernel version, and the system-wide max pipe buffer size. Called from the IO_URING branch + * of {@link #createEventLoop} so it logs at most once per process and only when io_uring is + * actually used. + * + *

The pipe-max-size matters because Netty's {@code IoUringFileRegion} routes FileRegion + * sends through a {@code splice(2)} pipe (file -> pipe -> socket), bounded by the pipe + * buffer; a small pipe forces more SQE/CQE round-trips per shuffle chunk. + */ + private static void logIoUringCapabilitiesOnce() { + if (!ioUringCapabilitiesLogged.compareAndSet(false, true)) { + return; + } + String pipeMaxSize = "unknown"; + try { + pipeMaxSize = Files.readString(Path.of("/proc/sys/fs/pipe-max-size")).trim(); + } catch (IOException | RuntimeException ignored) { + // Not Linux, or /proc not mounted; leave as "unknown". + } + logger.info("Netty io_uring transport selected: features=[" + IoUring.featureString() + + "], kernel=" + System.getProperty("os.version") + + ", pipe-max-size=" + pipeMaxSize + " bytes"); + } + /** Creates a new ThreadFactory which prefixes each thread with the given name. */ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); @@ -142,7 +181,10 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String case NIO -> NioIoHandler.newFactory(); case EPOLL -> EpollIoHandler.newFactory(); case KQUEUE -> KQueueIoHandler.newFactory(); - case IO_URING -> IoUringIoHandler.newFactory(); + case IO_URING -> { + logIoUringCapabilitiesOnce(); + yield IoUringIoHandler.newFactory(); + } case AUTO -> { if (JavaUtils.isLinux && Epoll.isAvailable()) { yield EpollIoHandler.newFactory();