Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1608][part-7] improvement(doc): add doc and optimize reassign config options #1693

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetry =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
taskBlockSendFailureRetry || rssResubmitStage || blockIdSelfManagedEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetryEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);

// The feature of partition reassign is exclusive with multiple replicas and stage retry.
if (taskBlockSendFailureRetryEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private RssShuffleWriter(
this.blockFailSentRetryEnabled =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
+ RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(),
RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue());
this.blockFailSentRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public class RssClientConf {
"This option is only valid when the remote storage path is specified. If ture, "
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath.");

public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
public static final ConfigOption<Boolean> RSS_CLIENT_REASSIGN_ENABLED =
ConfigOptions.key("rss.client.reassign.enabled")
zuston marked this conversation as resolved.
Show resolved Hide resolved
.booleanType()
.defaultValue(false)
.withDescription(
Expand Down
18 changes: 17 additions & 1 deletion docs/client_guide/spark_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,20 @@ Other configuration:
|---|---|---|
|spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
|spark.rss.client.access.retry.interval.ms|20000|The interval between retries fallback to SortShuffleManager|
|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|
|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|

### Partition reassign in one shuffle attempt

zuston marked this conversation as resolved.
Show resolved Hide resolved
To achieve better task stability, partition's reassign mechanism that requests the new replacement shuffleServers is introduced to overcome server instability
that is in unhealthy or high memory pressure in one shuffle attempt. On current stage, this feature is not compatible with stage retry and multiple replicas mechanism (More tests should be added).

Using the following configs to enable this feature

```bash
# whether to enable reassign mechanism
spark.rss.client.reassign.enabled true
# The max reassign server num for one partition when using partition reassign mechanism.
spark.rss.client.reassign.maxReassignServerNum 10
# The block retry max times when partition reassign is enabled.
spark.rss.client.reassign.blockRetryMaxTimes 1
```
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;

/** This class is to basic test the mechanism of partition block data reassignment */
public class PartitionBlockDataReassignBasicTest extends SparkSQLTest {
Expand Down Expand Up @@ -105,7 +105,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set(
"spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
String.valueOf(grpcShuffleServers.size()));
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY;

/** This class is to test the partition reassign mechanism of multiple retries. */
Expand Down Expand Up @@ -86,7 +86,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "10");

// simulate the grpc servers has different free memory
Expand Down
Loading