Skip to content

Commit

Permalink
[#1651] improvement(netty): Set Netty as the default server type
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyma committed Apr 17, 2024
1 parent 4d04254 commit 84dda59
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public GrpcServer getServer() {

public GrpcServer getServer(ShuffleManagerGrpcService service) {
ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE);
if (type == ServerType.GRPC) {
if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
if (service == null) {
service = new ShuffleManagerGrpcService(shuffleManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
Expand Down Expand Up @@ -124,6 +125,7 @@ private RssShuffleDataIterator getDataIterator(
boolean compress) {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,16 @@
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;

import static org.junit.jupiter.api.Assertions.assertThrows;

public class ShuffleManagerServerFactoryTest {
@Test
public void testShuffleManagerServerType() {
// add code to generate tests that check the server type
RssBaseConf conf = new RssBaseConf();
conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC);
ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();

// other types should raise an exception
conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY);
factory = new ShuffleManagerServerFactory(null, conf);
assertThrows(UnsupportedOperationException.class, factory::getServer);
for (ServerType serverType : ServerType.values()) {
RssBaseConf conf = new RssBaseConf();
conf.set(RssBaseConf.RPC_SERVER_TYPE, serverType);
ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
readBufferSize = Integer.MAX_VALUE;
}
boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);

builder.indexReadLimit(indexReadLimit);
builder.storageType(storageType);
builder.readBufferSize(readBufferSize);
builder.offHeapEnable(offHeapEnabled);
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
}
} else {
// most for test
RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf();
Expand All @@ -131,7 +132,9 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
builder.rssConf(rssConf);
builder.offHeapEnable(false);
builder.expectedTaskIdsBitmapFilterEnable(false);
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
}
}
if (builder.getIdHelper() == null) {
builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private void diskErrorTest(boolean isNettyMode) throws Exception {
isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList;
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(appId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected void validateResult(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE_HDFS.name())
.appId(appId)
.shuffleId(shuffleId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class QuorumTest extends ShuffleReadWriteBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.MEMORY_LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class RpcClientRetryTest extends ShuffleReadWriteBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(storageType.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void testConcurrentWrite2Hadoop(
});
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId(appId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ public void closeClient() {
nettyShuffleServerClient.close();
}

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) {
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.shuffleId(0)
.partitionId(0)
Expand Down Expand Up @@ -172,7 +173,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));

ShuffleReadClientImpl readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand Down Expand Up @@ -208,7 +209,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
shuffleServerClient.finishShuffle(rfsr);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand All @@ -218,7 +219,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
validateResult(readClient, expectedData, bitmaps[0]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(1)
.basePath(dataBasePath)
Expand All @@ -229,7 +230,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
validateResult(readClient, expectedData, bitmaps[1]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(2)
.basePath(dataBasePath)
Expand All @@ -240,7 +241,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
validateResult(readClient, expectedData, bitmaps[2]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(3)
.basePath(dataBasePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -95,10 +94,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
private static ShuffleServerConf grpcShuffleServerConfig;
private static ShuffleServerConf nettyShuffleServerConfig;

private static AtomicInteger serverRpcPortCounter = new AtomicInteger();
private static AtomicInteger nettyPortCounter = new AtomicInteger();
private static AtomicInteger jettyPortCounter = new AtomicInteger();

static @TempDir File tempDir;

private static ShuffleServerConf getShuffleServerConf(ServerType serverType) throws Exception {
Expand Down Expand Up @@ -225,8 +220,9 @@ private Map<Integer, List<ShuffleBlockInfo>> createTestData(
return partitionToBlocks;
}

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) {
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.shuffleId(0)
.partitionId(0)
Expand Down Expand Up @@ -305,7 +301,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
: new ShuffleServerInfo(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand Down Expand Up @@ -341,7 +337,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
shuffleServerClient.finishShuffle(rfsr);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand All @@ -351,7 +347,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
validateResult(readClient, expectedData, bitmaps[0]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(1)
.basePath(dataBasePath)
Expand All @@ -362,7 +358,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
validateResult(readClient, expectedData, bitmaps[1]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(2)
.basePath(dataBasePath)
Expand All @@ -373,7 +369,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
validateResult(readClient, expectedData, bitmaps[2]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(3)
.basePath(dataBasePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public void writeReadTest() throws Exception {

ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
Expand All @@ -380,6 +381,7 @@ public void writeReadTest() throws Exception {
assertTrue(commitResult);
readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMo
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)));
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ShuffleServerConfTest {
public void defaultConfTest() {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
assertFalse(shuffleServerConf.loadConfFromFile(null));
assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("GRPC_NETTY", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals(256, shuffleServerConf.getInteger(ShuffleServerConf.JETTY_CORE_POOL_SIZE));
assertEquals(0, shuffleServerConf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD));
}
Expand Down Expand Up @@ -68,7 +68,7 @@ public void confTest() {
assertEquals(2, shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1", shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b", ""));
assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("GRPC_NETTY", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
}

Expand Down

0 comments on commit 84dda59

Please sign in to comment.