Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1651] improvement(netty): Set Netty as the default server type #1653

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public GrpcServer getServer() {

public GrpcServer getServer(ShuffleManagerGrpcService service) {
ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE);
if (type == ServerType.GRPC) {
if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
if (service == null) {
service = new ShuffleManagerGrpcService(shuffleManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
Expand Down Expand Up @@ -124,6 +125,7 @@ private RssShuffleDataIterator getDataIterator(
boolean compress) {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,16 @@
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;

import static org.junit.jupiter.api.Assertions.assertThrows;

public class ShuffleManagerServerFactoryTest {
@Test
public void testShuffleManagerServerType() {
// add code to generate tests that check the server type
RssBaseConf conf = new RssBaseConf();
conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC);
ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();

// other types should raise an exception
conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY);
factory = new ShuffleManagerServerFactory(null, conf);
assertThrows(UnsupportedOperationException.class, factory::getServer);
for (ServerType serverType : ServerType.values()) {
RssBaseConf conf = new RssBaseConf();
conf.set(RssBaseConf.RPC_SERVER_TYPE, serverType);
ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
readBufferSize = Integer.MAX_VALUE;
}
boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);

builder.indexReadLimit(indexReadLimit);
builder.storageType(storageType);
builder.readBufferSize(readBufferSize);
builder.offHeapEnable(offHeapEnabled);
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
}
} else {
// most for test
RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf();
Expand All @@ -131,7 +132,9 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
builder.rssConf(rssConf);
builder.offHeapEnable(false);
builder.expectedTaskIdsBitmapFilterEnable(false);
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
}
}
if (builder.getIdHelper() == null) {
builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ public class RssBaseConf extends RssConf {
public static final ConfigOption<ServerType> RPC_SERVER_TYPE =
ConfigOptions.key("rss.rpc.server.type")
.enumType(ServerType.class)
.defaultValue(ServerType.GRPC)
.withDescription("Shuffle server type, default is grpc");
.defaultValue(ServerType.GRPC_NETTY)
.withDescription(
"Shuffle server type, supports GRPC_NETTY, GRPC. The default value is GRPC_NETTY. We recommend to use GRPC_NETTY for better stability and performance.");

public static final ConfigOption<Integer> RPC_SERVER_PORT =
ConfigOptions.key("rss.rpc.server.port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ public class RssClientConf {
public static final ConfigOption<ClientType> RSS_CLIENT_TYPE =
ConfigOptions.key("rss.client.type")
.enumType(ClientType.class)
.defaultValue(ClientType.GRPC)
.withDescription("Supports GRPC, GRPC_NETTY");
.defaultValue(ClientType.GRPC_NETTY)
.withDescription(
"Supports GRPC_NETTY, GRPC. The default value is GRPC_NETTY. We recommend to use GRPC_NETTY for better stability and performance.");

public static final ConfigOption<Boolean> RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED =
ConfigOptions.key("rss.client.remote.storage.useLocalConfAsDefault")
Expand Down
20 changes: 10 additions & 10 deletions docs/client_guide/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ spark.rss.data.replica.read 2
```

### Netty Setting
| Property Name | Default | Description |
|-----------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <client_type>.rss.client.type | GRPC | The default is GRPC, we can set it to GRPC_NETTY to enable the Netty on the client |
| <client_type>.rss.client.netty.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. |
| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 | Connection active timeout. |
| <client_type>.rss.client.netty.client.threads | 0 | Number of threads used in the client thread pool. Default is 0, Netty will use the number of (available logical cores * 2) as the number of threads. |
| <client_type>.rss.client.netty.client.prefer.direct.bufs | true | If true, we will prefer allocating off-heap byte buffers within Netty. |
| <client_type>.rss.client.netty.client.connections.per.peer | 2 | Suppose there are 100 executors, spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer will establish a total of (100 * 2) connections with multiple clients. |
| <client_type>.rss.client.netty.client.receive.buffer | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. |
| <client_type>.rss.client.netty.client.send.buffer | 0 | Send buffer size (SO_SNDBUF). |
| Property Name | Default | Description |
|-------------------------------------------------------------|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <client_type>.rss.client.type | GRPC_NETTY | Supports GRPC_NETTY, GRPC. The default value is GRPC_NETTY. We recommend to use GRPC_NETTY for better stability and performance. |
| <client_type>.rss.client.netty.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. |
| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 | Connection active timeout. |
| <client_type>.rss.client.netty.client.threads | 0 | Number of threads used in the client thread pool. Default is 0, Netty will use the number of (available logical cores * 2) as the number of threads. |
| <client_type>.rss.client.netty.client.prefer.direct.bufs | true | If true, we will prefer allocating off-heap byte buffers within Netty. |
| <client_type>.rss.client.netty.client.connections.per.peer | 2 | Suppose there are 100 executors, spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer will establish a total of (100 * 2) connections with multiple clients. |
| <client_type>.rss.client.netty.client.receive.buffer | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. |
| <client_type>.rss.client.netty.client.send.buffer | 0 | Send buffer size (SO_SNDBUF). |
Loading
Loading