diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java index 6b208d95bbfbc..6ab401b9a0d5a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java @@ -19,9 +19,18 @@ /** * Selector for which form of low-level IO we should use. - * NIO is always available, while EPOLL is only available on Linux. - * AUTO is used to select EPOLL if it's available, or NIO otherwise. */ public enum IOMode { - NIO, EPOLL + /** + * Java NIO (Selector), cross-platform portable + */ + NIO, + /** + * Native EPOLL via JNI, Linux only + */ + EPOLL, + /** + * Native KQUEUE via JNI, MacOS/BSD only + */ + KQUEUE } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 2dd1c8f2e4a7d..da4b3109bbe1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -26,6 +26,9 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; +import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -68,6 +71,7 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String return switch (mode) { case NIO -> new NioEventLoopGroup(numThreads, threadFactory); case EPOLL -> new EpollEventLoopGroup(numThreads, threadFactory); + case KQUEUE -> new KQueueEventLoopGroup(numThreads, threadFactory); }; } @@ -76,6 +80,7 @@ public static Class getClientChannelClass(IOMode mode) { return switch (mode) { case NIO -> NioSocketChannel.class; case EPOLL -> EpollSocketChannel.class; + case KQUEUE -> KQueueSocketChannel.class; }; } @@ -84,6 +89,7 @@ public static Class getServerChannelClass(IOMode mode) return switch (mode) { case NIO -> NioServerSocketChannel.class; case EPOLL -> EpollServerSocketChannel.class; + case KQUEUE -> KQueueServerSocketChannel.class; }; } diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala index 378a361845139..18a8453d60be2 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala @@ -17,14 +17,42 @@ package org.apache.spark -import org.scalatest.BeforeAndAfterAll +import org.scalactic.source.Position +import org.scalatest.{BeforeAndAfterAll, Tag} -class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll { +import org.apache.spark.network.util.IOMode +import org.apache.spark.util.Utils + +abstract class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with Netty shuffle mode. + def ioMode: IOMode = IOMode.NIO + def shouldRunTests: Boolean = true override def beforeAll(): Unit = { super.beforeAll() conf.set("spark.shuffle.blockTransferService", "netty") + conf.set("spark.shuffle.io.mode", ioMode.toString) + } + + override protected def test(testName: String, testTags: Tag*)(testBody: => Any)( + implicit pos: Position): Unit = { + if (!shouldRunTests) { + ignore(s"$testName [disabled on ${Utils.osName} with $ioMode]")(testBody) + } else { + super.test(testName, testTags: _*) {testBody} + } } } + +class ShuffleNettyNioSuite extends ShuffleNettySuite + +class ShuffleNettyEpollSuite extends ShuffleNettySuite { + override def shouldRunTests: Boolean = Utils.isLinux + override def ioMode: IOMode = IOMode.EPOLL +} + +class ShuffleNettyKQueueSuite extends ShuffleNettySuite { + override def shouldRunTests: Boolean = Utils.isMac + override def ioMode: IOMode = IOMode.KQUEUE +}