Skip to content

Commit

Permalink
[#414] feat(client): support specifying per-partition's max concurren…
Browse files Browse the repository at this point in the history
…cy to write in client side (#815)

### What changes were proposed in this pull request?

1. Support specifying per-partition's max concurrency to write in client side

### Why are the changes needed?

The PR of #396 has introduced the concurrent HDFS writing for one partition, 
but the concurrency is determined by the server client. In order to increase flexibility, 
this PR supports specifying per-partition's max concurrency to write in client side

### Does this PR introduce _any_ user-facing change?

Yes. The client conf of `<client_type>.rss.client.max.concurrency.per-partition.write` and `rss.server.client.max.concurrency.limit.per-partition.write` are introduced.

### How was this patch tested?
1. UTs
  • Loading branch information
zuston authored Apr 27, 2023
1 parent 24a5195 commit 8edefdf
Show file tree
Hide file tree
Showing 27 changed files with 292 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

public class RssMRAppMaster extends MRAppMaster {

private final String rssNmHost;
Expand Down Expand Up @@ -228,7 +230,8 @@ public Thread newThread(Runnable r) {
0,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL
ShuffleDataDistributionType.NORMAL,
RssMRConfig.toRssConf(conf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)
));
LOG.info("Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
return shuffleAssignments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ public void registerShuffle(
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType distributionType) {
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ public void registerShuffle(
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo storageType,
ShuffleDataDistributionType distributionType) {
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;

import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

public class RssShuffleManager extends RssShuffleManagerBase {

Expand All @@ -100,6 +101,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
private final String user;
private final String uuid;
private DataPusher dataPusher;
private final int maxConcurrencyPerPartitionToWrite;

private final Map<Integer, Integer> shuffleIdToPartitionNum = Maps.newConcurrentMap();
private final Map<Integer, Integer> shuffleIdToNumMapTasks = Maps.newConcurrentMap();
Expand All @@ -119,6 +121,8 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.dataReplicaRead = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ);
this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
this.dataReplicaSkipEnabled = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED);
this.maxConcurrencyPerPartitionToWrite =
RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
LOG.info("Check quorum config ["
+ dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" + dataReplicaSkipEnabled + "]");
RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite, dataReplicaRead);
Expand Down Expand Up @@ -318,7 +322,8 @@ protected void registerShuffleServers(
shuffleId,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL
ShuffleDataDistributionType.NORMAL,
maxConcurrencyPerPartitionToWrite
);
});
LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;

import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

public class RssShuffleManager extends RssShuffleManagerBase {

Expand All @@ -105,6 +106,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private final ShuffleDataDistributionType dataDistributionType;
private final int maxConcurrencyPerPartitionToWrite;
private String user;
private String uuid;
private Set<String> failedTaskIds = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -141,6 +143,8 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
this.dataDistributionType = getDataDistributionType(sparkConf);
this.maxConcurrencyPerPartitionToWrite =
RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
Expand Down Expand Up @@ -236,6 +240,8 @@ protected static ShuffleDataDistributionType getDataDistributionType(SparkConf s
this.sparkConf = conf;
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
this.maxConcurrencyPerPartitionToWrite =
RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
this.heartbeatTimeout = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
Expand Down Expand Up @@ -699,7 +705,8 @@ protected void registerShuffleServers(
shuffleId,
entry.getValue(),
remoteStorage,
dataDistributionType
dataDistributionType,
maxConcurrencyPerPartitionToWrite
);
});
LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ void registerShuffle(
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType);
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite);

boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ public void registerShuffle(
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType) {
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -417,7 +418,15 @@ public void registerShuffle(
}

RssRegisterShuffleRequest request =
new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user, dataDistributionType);
new RssRegisterShuffleRequest(
appId,
shuffleId,
partitionRanges,
remoteStorage,
user,
dataDistributionType,
maxConcurrencyPerPartitionToWrite
);
RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

String msg = "Error happened when registerShuffle with appId[" + appId + "], shuffleId[" + shuffleId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public class RssClientConf {
.withDescription("The type of partition shuffle data distribution, including normal and local_order. "
+ "The default value is normal. This config is only valid in Spark3.x");

public static final ConfigOption<Integer> MAX_CONCURRENCY_PER_PARTITION_TO_WRITE = ConfigOptions
.key("rss.client.max.concurrency.of.per-partition.write")
.intType()
.defaultValue(0)
.withDescription("The max concurrency for single partition to write, the value is the max file number "
+ "for one partition, remote shuffle server should respect this.");

public static final ConfigOption<Integer> NETTY_IO_CONNECT_TIMEOUT_MS = ConfigOptions
.key("rss.client.netty.io.connect.timeout.ms")
.intType()
Expand Down
1 change: 1 addition & 0 deletions docs/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ These configurations are shared by all types of clients.
|<client_type>.rss.estimate.task.concurrency.dynamic.factor|1.0|Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when <client_type>.rss.estimate.server.assignment.enabled=true or Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS.|
|<client_type>.rss.estimate.server.assignment.enabled|false|Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks.|
|<client_type>.rss.estimate.task.concurrency.per.server|80|It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer.|
|<client_type>.rss.client.max.concurrency.of.per-partition.write|-|The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0.|
Notice:

1. `<client_type>` should be `spark` or `mapreduce`
Expand Down
7 changes: 4 additions & 3 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ This document will introduce how to deploy Uniffle shuffle servers.
| rss.server.disk.capacity.ratio | 0.9 | When `rss.server.disk.capacity` is negative, disk whole space * ratio is used |
| rss.server.multistorage.fallback.strategy.class | - | The fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support `org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy` and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` will be used. |
| rss.server.leak.shuffledata.check.interval | 3600000 | The interval of leak shuffle data check (ms) |
| rss.server.max.concurrency.of.single.partition.writer | 1 | The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition. |
| rss.server.max.concurrency.of.per-partition.write | 1 | The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition. |
| rss.server.max.concurrency.limit.of.per-partition.write | - | The limit for max concurrency per-partition write specified by client, this won't be enabled by default. |
| rss.metrics.reporter.class | - | The class of metrics reporter. |
|rss.server.multistorage.manager.selector.class | org.apache.uniffle.server.storage.multi.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage. |

Expand Down Expand Up @@ -129,7 +130,7 @@ If you don't use HDFS, the huge partition may be flushed to local disk, which is

For HDFS, the conf value of `rss.server.single.buffer.flush.threshold` should be greater than the value of `rss.server.flush.cold.storage.threshold.size`, which will flush data directly to HDFS.

Finally, to improve the speed of writing to HDFS for a single partition, the value of `rss.server.max.concurrency.of.single.partition.writer` and `rss.server.flush.threadPool.size` could be increased to 10 or 20.
Finally, to improve the speed of writing to HDFS for a single partition, the value of `rss.server.max.concurrency.of.per-partition.write` and `rss.server.flush.threadPool.size` could be increased to 10 or 20.

#### Example of server conf
```
Expand All @@ -153,7 +154,7 @@ rss.server.app.expired.withoutHeartbeat 120000
rss.server.flush.threadPool.size 20
rss.server.flush.cold.storage.threshold.size 128m
rss.server.single.buffer.flush.threshold 129m
rss.server.max.concurrency.of.single.partition.writer 20
rss.server.max.concurrency.of.per-partition.write 20
rss.server.huge-partition.size.threshold 20g
rss.server.huge-partition.memory.limit.ratio 0.2
```
7 changes: 6 additions & 1 deletion docs/uniffle-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ license: |

# Upgrading from Coordinator 0.6 to 0.7

+ Since we have reconstructed the class file under coordinator, for the `rss.coordinator.access.checkers` parameter, the original value `org.apache.unifle.coordinator.AccessClusterLoadChecker` has been replaced with `org.apache.unifle.coordinator.access.checker.AccessClusterLoadChecker`, `org.apache.unifle.coordinator.AccessCandidatesChecker` has been replaced with `org.apache.unifle.coordinator.access.checker.AccessCandidatesChecker`, In addition, `org.apache.unifle.coordinator.access.checker.AccessQuotaChecker` has been added as the default checker.
+ Since we have reconstructed the class file under coordinator, for the `rss.coordinator.access.checkers` parameter, the original value `org.apache.unifle.coordinator.AccessClusterLoadChecker` has been replaced with `org.apache.unifle.coordinator.access.checker.AccessClusterLoadChecker`, `org.apache.unifle.coordinator.AccessCandidatesChecker` has been replaced with `org.apache.unifle.coordinator.access.checker.AccessCandidatesChecker`, In addition, `org.apache.unifle.coordinator.access.checker.AccessQuotaChecker` has been added as the default checker.


# Upgrading from Server 0.7 to 0.8 (unreleased)

+ The configuration key rss.server.max.concurrency.of.single.partition.writer has been renamed to rss.server.max.concurrency.of.per-partition.write. However, we will continue to support the old key until version 0.9, at which point it will be deprecated.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ private void registerShuffleServer(String testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL
ShuffleDataDistributionType.NORMAL,
1
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
Expand All @@ -41,7 +46,9 @@
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
Expand All @@ -66,15 +73,26 @@ public static void setupServers() throws Exception {
startServers();
}

@Test
public void testConcurrentWrite2Hdfs() throws Exception {
String appId = "testConcurrentWrite2Hdfs";
private static Stream<Arguments> clientConcurrencyAndExpectedProvider() {
return Stream.of(
Arguments.of(-1, MAX_CONCURRENCY),
Arguments.of(MAX_CONCURRENCY + 1, MAX_CONCURRENCY + 1)
);
}

@ParameterizedTest
@MethodSource("clientConcurrencyAndExpectedProvider")
public void testConcurrentWrite2Hdfs(int clientSpecifiedConcurrency, int expectedConcurrency) throws Exception {
String appId = "testConcurrentWrite2Hdfs_" + new Random().nextInt();
String dataBasePath = HDFS_URI + "rss/test";
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(
appId,
0,
Lists.newArrayList(new PartitionRange(0, 1)),
dataBasePath
new RemoteStorageInfo(dataBasePath),
StringUtils.EMPTY,
ShuffleDataDistributionType.NORMAL,
clientSpecifiedConcurrency
);
shuffleServerClient.registerShuffle(rrsr);

Expand Down Expand Up @@ -118,7 +136,7 @@ public void testConcurrentWrite2Hdfs() throws Exception {
.stream(fileStatuses)
.filter(x -> x.getPath().getName().endsWith(SHUFFLE_DATA_FILE_SUFFIX))
.count();
assertEquals(MAX_CONCURRENCY, actual);
assertEquals(expectedConcurrency, actual);

ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT);
Roaring64NavigableMap blocksBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public void clearResourceTest() throws Exception {
0,
Lists.newArrayList(new PartitionRange(0, 1)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL
ShuffleDataDistributionType.NORMAL,
-1
);
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest1", 500L, "user");
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 500L);
Expand Down
Loading

0 comments on commit 8edefdf

Please sign in to comment.