Skip to content

Commit

Permalink
[#1651] improvement(netty): Set Netty as the default server type
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyma committed Apr 17, 2024
1 parent 4d04254 commit b50c76c
Show file tree
Hide file tree
Showing 13 changed files with 35 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
Expand Down Expand Up @@ -124,6 +125,7 @@ private RssShuffleDataIterator getDataIterator(
boolean compress) {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
readBufferSize = Integer.MAX_VALUE;
}
boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);

builder.indexReadLimit(indexReadLimit);
builder.storageType(storageType);
builder.readBufferSize(readBufferSize);
builder.offHeapEnable(offHeapEnabled);
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
}
} else {
// most for test
RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf();
Expand All @@ -131,7 +132,9 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
builder.rssConf(rssConf);
builder.offHeapEnable(false);
builder.expectedTaskIdsBitmapFilterEnable(false);
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
}
}
if (builder.getIdHelper() == null) {
builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private void diskErrorTest(boolean isNettyMode) throws Exception {
isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList;
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(appId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected void validateResult(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE_HDFS.name())
.appId(appId)
.shuffleId(shuffleId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class QuorumTest extends ShuffleReadWriteBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.MEMORY_LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class RpcClientRetryTest extends ShuffleReadWriteBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(storageType.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void testConcurrentWrite2Hadoop(
});
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId(appId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ public void closeClient() {
nettyShuffleServerClient.close();
}

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) {
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.shuffleId(0)
.partitionId(0)
Expand Down Expand Up @@ -172,7 +173,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));

ShuffleReadClientImpl readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand Down Expand Up @@ -208,7 +209,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
shuffleServerClient.finishShuffle(rfsr);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand All @@ -218,7 +219,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
validateResult(readClient, expectedData, bitmaps[0]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(1)
.basePath(dataBasePath)
Expand All @@ -229,7 +230,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
validateResult(readClient, expectedData, bitmaps[1]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(2)
.basePath(dataBasePath)
Expand All @@ -240,7 +241,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
validateResult(readClient, expectedData, bitmaps[2]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(3)
.basePath(dataBasePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -95,10 +94,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
private static ShuffleServerConf grpcShuffleServerConfig;
private static ShuffleServerConf nettyShuffleServerConfig;

private static AtomicInteger serverRpcPortCounter = new AtomicInteger();
private static AtomicInteger nettyPortCounter = new AtomicInteger();
private static AtomicInteger jettyPortCounter = new AtomicInteger();

static @TempDir File tempDir;

private static ShuffleServerConf getShuffleServerConf(ServerType serverType) throws Exception {
Expand Down Expand Up @@ -225,8 +220,9 @@ private Map<Integer, List<ShuffleBlockInfo>> createTestData(
return partitionToBlocks;
}

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) {
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.shuffleId(0)
.partitionId(0)
Expand Down Expand Up @@ -305,7 +301,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
: new ShuffleServerInfo(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand Down Expand Up @@ -341,7 +337,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
shuffleServerClient.finishShuffle(rfsr);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.basePath(dataBasePath)
.blockIdBitmap(bitmaps[0])
Expand All @@ -351,7 +347,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
validateResult(readClient, expectedData, bitmaps[0]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(1)
.basePath(dataBasePath)
Expand All @@ -362,7 +358,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
validateResult(readClient, expectedData, bitmaps[1]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(2)
.basePath(dataBasePath)
Expand All @@ -373,7 +369,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
validateResult(readClient, expectedData, bitmaps[2]);

readClient =
baseReadBuilder()
baseReadBuilder(isNettyMode)
.appId(appId)
.partitionId(3)
.basePath(dataBasePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public void writeReadTest() throws Exception {

ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
Expand All @@ -380,6 +381,7 @@ public void writeReadTest() throws Exception {
assertTrue(commitResult);
readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMo
LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)));
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ShuffleServerConfTest {
public void defaultConfTest() {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
assertFalse(shuffleServerConf.loadConfFromFile(null));
assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("GRPC_NETTY", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals(256, shuffleServerConf.getInteger(ShuffleServerConf.JETTY_CORE_POOL_SIZE));
assertEquals(0, shuffleServerConf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD));
}
Expand Down Expand Up @@ -68,7 +68,7 @@ public void confTest() {
assertEquals(2, shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1", shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b", ""));
assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("GRPC_NETTY", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
}

Expand Down

0 comments on commit b50c76c

Please sign in to comment.