Skip to content

Commit

Permalink
[#477] feat(spark): Fix rss.resubmit.stage does not support dynamic…
Browse files Browse the repository at this point in the history
… client conf. (#1050)

### What changes were proposed in this pull request?

Fix `rss.resubmit.stage` does not support dynamic client conf.

### Why are the changes needed?

For #477.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT.

Co-authored-by: leixianming <leixianming@didiglobal.com>
  • Loading branch information
leixm and leixianming committed Jul 31, 2023
1 parent 1701d06 commit 9ef3747
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ public class RssSparkConfig {
.doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description()))
.createWithDefault(-1);

public static final ConfigEntry<Boolean> RSS_RESUBMIT_STAGE =
createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE)
.internal()
.doc("Whether to enable the resubmit stage."))
.createWithDefault(false);

// spark2 doesn't have this key defined
public static final String SPARK_SHUFFLE_COMPRESS_KEY = "spark.shuffle.compress";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
Expand Down Expand Up @@ -183,7 +182,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
Expand Down Expand Up @@ -207,7 +206,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ public void excludeNodesNoDelayTest() throws Exception {
assertEquals(4, scm.getNodesNum());
assertEquals(2, scm.getExcludeNodes().size());
}
File blacklistFile = new File(excludeNodesPath);
assertTrue(blacklistFile.delete());
}

private void writeExcludeHosts(String path, Set<String> values) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ public class RSSStageResubmitTest extends SparkIntegrationTestBase {

@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
final CoordinatorConf coordinatorConf = getCoordinatorConf();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
dynamicConf.put(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE, "true");
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
Expand Down Expand Up @@ -79,8 +81,6 @@ protected SparkConf createSparkConf() {

@Override
public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE, "true");
sparkConf.set("spark.task.maxFailures", String.valueOf(maxTaskFailures));
}

Expand Down

0 comments on commit 9ef3747

Please sign in to comment.