Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ jobs:
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Run benchmarks
run: |
# Raise RLIMIT_MEMLOCK so explicit IO_URING benchmark scenarios and
# tests can allocate the Netty io_uring transport's submission/completion
# queue rings on Linux 5.10+. The default 64 KiB is not enough for the
# eight rings Spark allocates by default, causing NettyUtils.isIoUringUsable
# to return false and io_uring scenarios to skip.
sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true
./build/sbt -Pscala-${{ inputs.scala }} -Pyarn -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pspark-ganglia-lgpl Test/package
# Make less noisy
cp conf/log4j2.properties.template conf/log4j2.properties
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ jobs:
run: |
# Fix for TTY related issues when launching the Ammonite REPL in tests.
export TERM=vt100
# Raise RLIMIT_MEMLOCK so the explicit IO_URING test suites
# (e.g. ShuffleNettyIoUringSuite) can allocate the Netty io_uring transport's
# submission/completion queue rings. The default 64 KiB is not enough for
# the eight rings Spark allocates by default, causing
# NettyUtils.isIoUringUsable to return false and io_uring suites to skip.
sudo prlimit --pid $$ --memlock=unlimited:unlimited 2>/dev/null || true
# Hive "other tests" test needs larger metaspace size based on experiment.
if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi
# SPARK-46283: should delete the following env replacement after SPARK 3.x EOL
Expand Down
10 changes: 10 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public TransportContext(
this.closeIdleConnections = closeIdleConnections;
this.sslFactory = createSslFactory();

logger.info("TransportContext initialized: module=" +
(conf.getModuleName() == null ? "<none>" : conf.getModuleName()) +
", ioMode=" + conf.ioMode() +
", role=" + (isClientOnly ? "client-only" : "server+client"));

if (conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle") &&
!isClientOnly && conf.separateChunkFetchRequest()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ public enum IOMode {
*/
KQUEUE,
/**
* Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO.
* Native io_uring via JNI, Linux only. Requires kernel 5.10+.
*/
IO_URING,
/**
* Prefer to use a native transport when available. On Linux, EPOLL is used; on MacOS/BSD,
* KQUEUE is used. Falls back to NIO when no native transport is available.
*
* <p>{@link #IO_URING} is opt-in only and is not selected by AUTO; set the IO mode to
* {@code IO_URING} explicitly to use it.
*/
AUTO
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.spark.network.util;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
Expand All @@ -32,15 +37,24 @@
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.uring.IoUring;
import io.netty.channel.uring.IoUringIoHandler;
import io.netty.channel.uring.IoUringServerSocketChannel;
import io.netty.channel.uring.IoUringSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.PlatformDependent;

import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;

/**
* Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL,
* , KQUEUE, or AUTO.
* KQUEUE, IO_URING, or AUTO.
*/
public class NettyUtils {

private static final SparkLogger logger = SparkLoggerFactory.getLogger(NettyUtils.class);

/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
Expand All @@ -56,10 +70,104 @@ public class NettyUtils {
private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
new PooledByteBufAllocator[2];

/**
* Cached result of probing whether io_uring can actually allocate the rings Spark needs on
* this JVM. {@code null} means not yet probed; non-null is the probed value.
*
* <p>{@link IoUring#isAvailable()} only checks that the JNI library loaded and basic syscalls
* work; it does not detect environments where the kernel allows io_uring but
* {@code RLIMIT_MEMLOCK} is too low for the submission/completion queue rings (common in
* containers, GitHub Actions runners, and other restricted environments). The probe creates
* a {@link MultiThreadIoEventLoopGroup} sized to {@link #MAX_DEFAULT_NETTY_THREADS} -- which
* is the worst case Spark allocates by default for a single event loop group -- and shuts it
* down to verify ring allocation actually succeeds. Probing with one thread is insufficient
* because some restricted environments allow a single ring to fit in memlock but not the eight
* rings Spark needs in practice.
*/
private static volatile Boolean ioUringUsable = null;

/**
* Guards a one-shot info log line describing the io_uring features the running kernel exposes.
* Emitted the first time {@link #createEventLoop} is asked for {@link IOMode#IO_URING}, so it
* appears once per process when -- and only when -- io_uring is actually selected. Useful for
* triaging shuffle/RPC performance reports: it answers questions like "is {@code SPLICE}
* supported on this kernel?" and "what is the kernel's max pipe buffer size?" without needing
* to attach a debugger.
*/
private static final AtomicBoolean ioUringCapabilitiesLogged = new AtomicBoolean(false);

public static long freeDirectMemory() {
return PlatformDependent.maxDirectMemory() - PlatformDependent.usedDirectMemory();
}

/**
* Returns true if io_uring can actually be used on the running JVM. Probes once (with the
* result cached) by attempting a real ring allocation, which catches environments where
* {@link IoUring#isAvailable()} returns true but {@code RLIMIT_MEMLOCK} is too low to allocate
* the submission/completion queues (common in containers, GitHub Actions runners, and other
* restricted environments).
*
* <p>Used by tests that gate execution on io_uring being usable. AUTO mode does not consult
* this; io_uring is opt-in only via {@link IOMode#IO_URING}, which surfaces the underlying
* error when ring allocation fails.
*/
public static boolean isIoUringUsable() {
Boolean cached = ioUringUsable;
if (cached != null) {
return cached;
}
synchronized (NettyUtils.class) {
if (ioUringUsable != null) {
return ioUringUsable;
}
if (!JavaUtils.isLinux || !IoUring.isAvailable()) {
ioUringUsable = false;
return false;
}
MultiThreadIoEventLoopGroup probe = null;
try {
probe = new MultiThreadIoEventLoopGroup(
MAX_DEFAULT_NETTY_THREADS, IoUringIoHandler.newFactory());
ioUringUsable = true;
} catch (Throwable t) {
logger.warn("io_uring is reported as available but allocation of " +
MAX_DEFAULT_NETTY_THREADS + " rings failed. " +
"Common cause: RLIMIT_MEMLOCK too low (containers, restricted environments).", t);
ioUringUsable = false;
} finally {
if (probe != null) {
probe.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
}
}
return ioUringUsable;
}
}

/**
* Emits a single info log line summarizing the io_uring features the running kernel exposes,
* the kernel version, and the system-wide max pipe buffer size. Called from the IO_URING branch
* of {@link #createEventLoop} so it logs at most once per process and only when io_uring is
* actually used.
*
* <p>The pipe-max-size matters because Netty's {@code IoUringFileRegion} routes FileRegion
* sends through a {@code splice(2)} pipe (file -&gt; pipe -&gt; socket), bounded by the pipe
* buffer; a small pipe forces more SQE/CQE round-trips per shuffle chunk.
*/
private static void logIoUringCapabilitiesOnce() {
if (!ioUringCapabilitiesLogged.compareAndSet(false, true)) {
return;
}
String pipeMaxSize = "unknown";
try {
pipeMaxSize = Files.readString(Path.of("/proc/sys/fs/pipe-max-size")).trim();
} catch (IOException | RuntimeException ignored) {
// Not Linux, or /proc not mounted; leave as "unknown".
}
logger.info("Netty io_uring transport selected: features=[" + IoUring.featureString() +
"], kernel=" + System.getProperty("os.version") +
", pipe-max-size=" + pipeMaxSize + " bytes");
}

/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
Expand All @@ -73,6 +181,10 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
case NIO -> NioIoHandler.newFactory();
case EPOLL -> EpollIoHandler.newFactory();
case KQUEUE -> KQueueIoHandler.newFactory();
case IO_URING -> {
logIoUringCapabilitiesOnce();
yield IoUringIoHandler.newFactory();
}
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollIoHandler.newFactory();
Expand All @@ -92,6 +204,7 @@ public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
case KQUEUE -> KQueueSocketChannel.class;
case IO_URING -> IoUringSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollSocketChannel.class;
Expand All @@ -110,6 +223,7 @@ public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode)
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
case KQUEUE -> KQueueServerSocketChannel.class;
case IO_URING -> IoUringServerSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollServerSocketChannel.class;
Expand Down
6 changes: 6 additions & 0 deletions common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_epoll_riscv64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_epoll_riscv64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_io_uring42_x86_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_io_uring42_x86_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_io_uring42_aarch_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_io_uring42_aarch_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_io_uring42_riscv64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_io_uring42_riscv64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_tcnative_linux_x86_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_tcnative_linux_x86_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_tcnative_linux_aarch_64.so"
Expand Down
Loading