Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Followup] Use asList method in some existing configOptions #18

Merged
merged 5 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.uniffle.common.config;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;

/**
* {@code ConfigOptions} are used to build a {@link ConfigOption}.
* The option is typically built in one of the following pattern:
Expand Down Expand Up @@ -237,7 +240,11 @@ public ListConfigOptionBuilder(String key, Class<E> clazz, Function<Object, E> a
if (v instanceof List) {
return (List<E>) v;
} else {
return Arrays.stream(v.toString().split(LIST_SPILTTER))
String trimmedVal = v.toString().trim();
if (StringUtils.isEmpty(trimmedVal)) {
return Collections.emptyList();
}
return Arrays.stream(trimmedVal.split(LIST_SPILTTER))
.map(s -> atomicConverter.apply(s)).collect(Collectors.toList());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ public void testListTypes() {
} catch (IllegalArgumentException illegalArgumentException) {
fail();
}

// test the empty list
final ConfigOption<List<String>> emptyListStringOption = ConfigOptions
.key("rss.key5")
.stringType()
.asList()
.noDefaultValue()
.withDescription("List config key5");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't set this option and call method conf.get(emptyListStringOption) directly, what will the result be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the same behavior as Flink?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have checked.


List<String> key5Val = conf.get(emptyListStringOption);
assertNull(key5Val);

conf.setString(emptyListStringOption.key(), "");
assertEquals(conf.get(emptyListStringOption).size(), 0);
conf.setString(emptyListStringOption.key(), ", ");
assertEquals(conf.get(emptyListStringOption).size(), 0);
conf.setString(emptyListStringOption.key(), " ");
assertEquals(conf.get(emptyListStringOption).size(), 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.uniffle.coordinator;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,14 +47,13 @@ public AccessManager(
}

private void init() throws RuntimeException {
String checkers = coordinatorConf.get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS);
if (StringUtils.isEmpty(checkers)) {
List<String> checkers = coordinatorConf.get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS);
if (CollectionUtils.isEmpty(checkers)) {
LOG.warn("Access checkers is empty, will not init any checkers.");
return;
}

String[] names = checkers.trim().split(",");
accessCheckers = RssUtils.loadExtensions(AccessChecker.class, Arrays.asList(names), this);
accessCheckers = RssUtils.loadExtensions(AccessChecker.class, checkers, this);
}

public AccessCheckResult handleAccessRequest(AccessInfo accessInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ public class CoordinatorConf extends RssBaseConf {
.intType()
.defaultValue(9)
.withDescription("The max number of shuffle server when do the assignment");
public static final ConfigOption<String> COORDINATOR_ACCESS_CHECKERS = ConfigOptions
public static final ConfigOption<List<String>> COORDINATOR_ACCESS_CHECKERS = ConfigOptions
.key("rss.coordinator.access.checkers")
.stringType()
.defaultValue("org.apache.uniffle.coordinator.AccessClusterLoadChecker")
.asList()
.defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker")
.withDescription("Access checkers");
public static final ConfigOption<Integer> COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions
.key("rss.coordinator.access.candidates.updateIntervalSec")
Expand All @@ -90,7 +91,7 @@ public class CoordinatorConf extends RssBaseConf {
.intType()
.checkValue(ConfigUtils.positiveIntegerValidator2, "load checker serverNum threshold must be positive")
.noDefaultValue()
.withDescription("Accessed candidates file path");
.withDescription("The minimal required number of healthy shuffle servers when being accessed by client");
public static final ConfigOption<Boolean> COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED = ConfigOptions
.key("rss.coordinator.dynamicClientConf.enabled")
.booleanType()
Expand All @@ -105,7 +106,7 @@ public class CoordinatorConf extends RssBaseConf {
.key("rss.coordinator.remote.storage.path")
.stringType()
.noDefaultValue()
.withDescription("all supported remote paths for RSS cluster, seperated by ','");
.withDescription("all supported remote paths for RSS cluster, separated by ','");
public static final ConfigOption<Integer> COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC = ConfigOptions
.key("rss.coordinator.dynamicClientConf.updateIntervalSec")
.intType()
Expand All @@ -116,7 +117,7 @@ public class CoordinatorConf extends RssBaseConf {
.key("rss.coordinator.remote.storage.cluster.conf")
.stringType()
.noDefaultValue()
.withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, sperated by ';'");
.withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");


public CoordinatorConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void test(@TempDir File tempDir) throws Exception {
getClass().getClassLoader().getResource("coordinator.conf")).getFile();
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, tempDir.toURI().toString());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker");

// file load checking at startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void test() throws Exception {
final String filePath = Objects.requireNonNull(
getClass().getClassLoader().getResource("coordinator.conf")).getFile();
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessClusterLoadChecker");
AccessManager accessManager = new AccessManager(conf, clusterManager, new Configuration());
AccessClusterLoadChecker accessClusterLoadChecker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public void clear() {
public void test() throws Exception {
// test init
CoordinatorConf conf = new CoordinatorConf();
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS, " , ");
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
try {
new AccessManager(conf, null, new Configuration());
} catch (RuntimeException e) {
String expectedMessage = "Empty classes";
assertTrue(e.getMessage().startsWith(expectedMessage));
}
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"com.Dummy,org.apache.uniffle.coordinator.AccessManagerTest$MockAccessChecker");
try {
new AccessManager(conf, null, new Configuration());
Expand All @@ -62,21 +62,21 @@ public void test() throws Exception {
assertTrue(e.getMessage().startsWith(expectedMessage));
}
// test empty checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS, "");
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
AccessManager accessManager = new AccessManager(conf, null, new Configuration());
assertTrue(accessManager.handleAccessRequest(
new AccessInfo(String.valueOf(new Random().nextInt()),
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION)))
.isSuccess());
accessManager.close();
// test mock checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
accessManager = new AccessManager(conf, null, new Configuration());
assertEquals(1, accessManager.getAccessCheckers().size());
assertTrue(accessManager.handleAccessRequest(new AccessInfo("mock1")).isSuccess());
assertTrue(accessManager.handleAccessRequest(new AccessInfo("mock2")).isSuccess());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
+ "org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysFalse");
accessManager = new AccessManager(conf, null, new Configuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void test() throws Exception {
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC, 1);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, HDFS_URI);
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker");

// file load checking at startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

public class SparkSQLWithDelegationShuffleManager extends SparkSQLTest {
public class SparkSQLWithDelegationShuffleManager extends org.apache.uniffle.test.SparkSQLTest {

@BeforeAll
public static void setupServers() throws Exception {
final String candidates = Objects.requireNonNull(
SparkSQLWithDelegationShuffleManager.class.getClassLoader().getResource("candidates")).getFile();
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

public class SparkSQLWithDelegationShuffleManagerFallback extends SparkSQLTest {
public class SparkSQLWithDelegationShuffleManagerFallback extends org.apache.uniffle.test.SparkSQLTest {

@BeforeAll
public static void setupServers() throws Exception {
final String candidates = Objects.requireNonNull(
SparkSQLWithDelegationShuffleManager.class.getClassLoader().getResource("candidates")).getFile();
org.apache.uniffle.test.SparkSQLWithDelegationShuffleManager.class.getClassLoader().getResource("candidates")).getFile();
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
Expand Down
11 changes: 5 additions & 6 deletions server/src/main/java/org/apache/uniffle/server/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,15 +47,14 @@ public class HealthCheck {
public HealthCheck(AtomicBoolean isHealthy, ShuffleServerConf conf, List<Checker> buildInCheckers) {
this.isHealthy = isHealthy;
this.checkIntervalMs = conf.getLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL);
String checkersStr = conf.getString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES);
if (StringUtils.isEmpty(checkersStr) && buildInCheckers.isEmpty()) {
List<String> configuredCheckers = conf.get(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES);
if (CollectionUtils.isEmpty(configuredCheckers) && buildInCheckers.isEmpty()) {
throw new IllegalArgumentException("The checkers cannot be empty");
}
checkers.addAll(buildInCheckers);
if (!StringUtils.isEmpty(checkersStr)) {
String[] checkerNames = checkersStr.split(",");
if (CollectionUtils.isNotEmpty(configuredCheckers)) {
try {
for (String name : checkerNames) {
for (String name : configuredCheckers) {
Class<?> cls = Class.forName(name);
Constructor<?> cons = cls.getConstructor(ShuffleServerConf.class);
checkers.add((Checker) cons.newInstance(conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,10 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(false)
.withDescription("The switch for the health check");

public static final ConfigOption<String> HEALTH_CHECKER_CLASS_NAMES = ConfigOptions
public static final ConfigOption<List<String>> HEALTH_CHECKER_CLASS_NAMES = ConfigOptions
.key("rss.server.health.checker.class.names")
.stringType()
.asList()
.noDefaultValue()
.withDescription("The list of the Checker's name");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class HealthCheckTest {
public void buildInCheckerTest() {
ShuffleServerConf conf = new ShuffleServerConf();
assertConf(conf);
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, "");
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), "");
assertConf(conf);
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, "org.apache.uniffle.server.LocalStorageChecker");
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), "org.apache.uniffle.server.LocalStorageChecker");
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "s1");
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
assertConf(conf);
Expand All @@ -62,15 +62,15 @@ public void buildInCheckerTest() {
public void checkTest() {
AtomicBoolean healthy = new AtomicBoolean(false);
ShuffleServerConf conf = new ShuffleServerConf();
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, HealthyMockChecker.class.getCanonicalName());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), HealthyMockChecker.class.getCanonicalName());
HealthCheck checker = new HealthCheck(healthy, conf, Lists.newArrayList());
checker.check();
assertTrue(healthy.get());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, UnHealthyMockChecker.class.getCanonicalName());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), UnHealthyMockChecker.class.getCanonicalName());
checker = new HealthCheck(healthy, conf, Lists.newArrayList());
checker.check();
assertFalse(healthy.get());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES,
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
UnHealthyMockChecker.class.getCanonicalName() + "," + HealthyMockChecker.class.getCanonicalName());
checker = new HealthCheck(healthy, conf, Lists.newArrayList());
checker.check();
Expand Down