Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@

/**
* Selector for which form of low-level IO we should use.
* NIO is always available, while EPOLL is only available on Linux.
* AUTO is used to select EPOLL if it's available, or NIO otherwise.
*/
public enum IOMode {
NIO, EPOLL
/**
* Java NIO (Selector), cross-platform portable
*/
NIO,
/**
* Native EPOLL via JNI, Linux only
*/
EPOLL,
/**
* Native KQUEUE via JNI, MacOS/BSD only
*/
KQUEUE
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -68,6 +71,7 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
return switch (mode) {
case NIO -> new NioEventLoopGroup(numThreads, threadFactory);
case EPOLL -> new EpollEventLoopGroup(numThreads, threadFactory);
case KQUEUE -> new KQueueEventLoopGroup(numThreads, threadFactory);
Copy link
Contributor

@LuciferYang LuciferYang Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deprecated API usage.
Of course, we can fix it uniformly in a separate pr

};
}

Expand All @@ -76,6 +80,7 @@ public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
return switch (mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
case KQUEUE -> KQueueSocketChannel.class;
};
}

Expand All @@ -84,6 +89,7 @@ public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode)
return switch (mode) {
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
case KQUEUE -> KQueueServerSocketChannel.class;
};
}

Expand Down
32 changes: 30 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,42 @@

package org.apache.spark

import org.scalatest.BeforeAndAfterAll
import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfterAll, Tag}

class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
import org.apache.spark.network.util.IOMode
import org.apache.spark.util.Utils

abstract class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {

// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.

def ioMode: IOMode = IOMode.NIO
def shouldRunTests: Boolean = true
override def beforeAll(): Unit = {
super.beforeAll()
conf.set("spark.shuffle.blockTransferService", "netty")
conf.set("spark.shuffle.io.mode", ioMode.toString)
}

override protected def test(testName: String, testTags: Tag*)(testBody: => Any)(
implicit pos: Position): Unit = {
if (!shouldRunTests) {
ignore(s"$testName [disabled on ${Utils.osName} with $ioMode]")(testBody)
} else {
super.test(testName, testTags: _*) {testBody}
}
}
}

class ShuffleNettyNioSuite extends ShuffleNettySuite

class ShuffleNettyEpollSuite extends ShuffleNettySuite {
override def shouldRunTests: Boolean = Utils.isLinux
override def ioMode: IOMode = IOMode.EPOLL
}

class ShuffleNettyKQueueSuite extends ShuffleNettySuite {
override def shouldRunTests: Boolean = Utils.isMac
override def ioMode: IOMode = IOMode.KQUEUE
}