Skip to content

Commit

Permalink
[Followup] Use asList method in some existing configOptions (#18)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Use asList method in some existing configOptions

### Why are the changes needed?
Directly use the asList method in ConfigOptions to get the config list values, and then avoid splitting values by users.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UTs.
  • Loading branch information
zuston committed Jul 5, 2022
1 parent 9de136d commit 8256765
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.uniffle.common.config;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;

/**
* {@code ConfigOptions} are used to build a {@link ConfigOption}.
* The option is typically built in one of the following pattern:
Expand Down Expand Up @@ -237,7 +240,11 @@ public ListConfigOptionBuilder(String key, Class<E> clazz, Function<Object, E> a
if (v instanceof List) {
return (List<E>) v;
} else {
return Arrays.stream(v.toString().split(LIST_SPILTTER))
String trimmedVal = v.toString().trim();
if (StringUtils.isEmpty(trimmedVal)) {
return Collections.emptyList();
}
return Arrays.stream(trimmedVal.split(LIST_SPILTTER))
.map(s -> atomicConverter.apply(s)).collect(Collectors.toList());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ public void testListTypes() {
} catch (IllegalArgumentException illegalArgumentException) {
fail();
}

// test the empty list
final ConfigOption<List<String>> emptyListStringOption = ConfigOptions
.key("rss.key5")
.stringType()
.asList()
.noDefaultValue()
.withDescription("List config key5");

List<String> key5Val = conf.get(emptyListStringOption);
assertNull(key5Val);

conf.setString(emptyListStringOption.key(), "");
assertEquals(conf.get(emptyListStringOption).size(), 0);
conf.setString(emptyListStringOption.key(), ", ");
assertEquals(conf.get(emptyListStringOption).size(), 0);
conf.setString(emptyListStringOption.key(), " ");
assertEquals(conf.get(emptyListStringOption).size(), 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.uniffle.coordinator;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,14 +47,13 @@ public AccessManager(
}

private void init() throws RuntimeException {
String checkers = coordinatorConf.get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS);
if (StringUtils.isEmpty(checkers)) {
List<String> checkers = coordinatorConf.get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS);
if (CollectionUtils.isEmpty(checkers)) {
LOG.warn("Access checkers is empty, will not init any checkers.");
return;
}

String[] names = checkers.trim().split(",");
accessCheckers = RssUtils.loadExtensions(AccessChecker.class, Arrays.asList(names), this);
accessCheckers = RssUtils.loadExtensions(AccessChecker.class, checkers, this);
}

public AccessCheckResult handleAccessRequest(AccessInfo accessInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ public class CoordinatorConf extends RssBaseConf {
.intType()
.defaultValue(9)
.withDescription("The max number of shuffle server when do the assignment");
public static final ConfigOption<String> COORDINATOR_ACCESS_CHECKERS = ConfigOptions
public static final ConfigOption<List<String>> COORDINATOR_ACCESS_CHECKERS = ConfigOptions
.key("rss.coordinator.access.checkers")
.stringType()
.defaultValue("org.apache.uniffle.coordinator.AccessClusterLoadChecker")
.asList()
.defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker")
.withDescription("Access checkers");
public static final ConfigOption<Integer> COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions
.key("rss.coordinator.access.candidates.updateIntervalSec")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void test(@TempDir File tempDir) throws Exception {
getClass().getClassLoader().getResource("coordinator.conf")).getFile();
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, tempDir.toURI().toString());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker");

// file load checking at startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void test() throws Exception {
final String filePath = Objects.requireNonNull(
getClass().getClassLoader().getResource("coordinator.conf")).getFile();
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessClusterLoadChecker");
AccessManager accessManager = new AccessManager(conf, clusterManager, new Configuration());
AccessClusterLoadChecker accessClusterLoadChecker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public void clear() {
public void test() throws Exception {
// test init
CoordinatorConf conf = new CoordinatorConf();
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS, " , ");
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
try {
new AccessManager(conf, null, new Configuration());
} catch (RuntimeException e) {
String expectedMessage = "Empty classes";
assertTrue(e.getMessage().startsWith(expectedMessage));
}
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"com.Dummy,org.apache.uniffle.coordinator.AccessManagerTest$MockAccessChecker");
try {
new AccessManager(conf, null, new Configuration());
Expand All @@ -62,21 +62,21 @@ public void test() throws Exception {
assertTrue(e.getMessage().startsWith(expectedMessage));
}
// test empty checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS, "");
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
AccessManager accessManager = new AccessManager(conf, null, new Configuration());
assertTrue(accessManager.handleAccessRequest(
new AccessInfo(String.valueOf(new Random().nextInt()),
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION)))
.isSuccess());
accessManager.close();
// test mock checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
accessManager = new AccessManager(conf, null, new Configuration());
assertEquals(1, accessManager.getAccessCheckers().size());
assertTrue(accessManager.handleAccessRequest(new AccessInfo("mock1")).isSuccess());
assertTrue(accessManager.handleAccessRequest(new AccessInfo("mock2")).isSuccess());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
+ "org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysFalse");
accessManager = new AccessManager(conf, null, new Configuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void test() throws Exception {
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC, 1);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, HDFS_URI);
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker");

// file load checking at startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

public class SparkSQLWithDelegationShuffleManager extends SparkSQLTest {
public class SparkSQLWithDelegationShuffleManager extends org.apache.uniffle.test.SparkSQLTest {

@BeforeAll
public static void setupServers() throws Exception {
final String candidates = Objects.requireNonNull(
SparkSQLWithDelegationShuffleManager.class.getClassLoader().getResource("candidates")).getFile();
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

public class SparkSQLWithDelegationShuffleManagerFallback extends SparkSQLTest {
public class SparkSQLWithDelegationShuffleManagerFallback extends org.apache.uniffle.test.SparkSQLTest {

@BeforeAll
public static void setupServers() throws Exception {
final String candidates = Objects.requireNonNull(
SparkSQLWithDelegationShuffleManager.class.getClassLoader().getResource("candidates")).getFile();
org.apache.uniffle.test.SparkSQLWithDelegationShuffleManager.class.getClassLoader().getResource("candidates")).getFile();
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
Expand Down
11 changes: 5 additions & 6 deletions server/src/main/java/org/apache/uniffle/server/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,15 +47,14 @@ public class HealthCheck {
public HealthCheck(AtomicBoolean isHealthy, ShuffleServerConf conf, List<Checker> buildInCheckers) {
this.isHealthy = isHealthy;
this.checkIntervalMs = conf.getLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL);
String checkersStr = conf.getString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES);
if (StringUtils.isEmpty(checkersStr) && buildInCheckers.isEmpty()) {
List<String> configuredCheckers = conf.get(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES);
if (CollectionUtils.isEmpty(configuredCheckers) && buildInCheckers.isEmpty()) {
throw new IllegalArgumentException("The checkers cannot be empty");
}
checkers.addAll(buildInCheckers);
if (!StringUtils.isEmpty(checkersStr)) {
String[] checkerNames = checkersStr.split(",");
if (CollectionUtils.isNotEmpty(configuredCheckers)) {
try {
for (String name : checkerNames) {
for (String name : configuredCheckers) {
Class<?> cls = Class.forName(name);
Constructor<?> cons = cls.getConstructor(ShuffleServerConf.class);
checkers.add((Checker) cons.newInstance(conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,10 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(false)
.withDescription("The switch for the health check");

public static final ConfigOption<String> HEALTH_CHECKER_CLASS_NAMES = ConfigOptions
public static final ConfigOption<List<String>> HEALTH_CHECKER_CLASS_NAMES = ConfigOptions
.key("rss.server.health.checker.class.names")
.stringType()
.asList()
.noDefaultValue()
.withDescription("The list of the Checker's name");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class HealthCheckTest {
public void buildInCheckerTest() {
ShuffleServerConf conf = new ShuffleServerConf();
assertConf(conf);
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, "");
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), "");
assertConf(conf);
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, "org.apache.uniffle.server.LocalStorageChecker");
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), "org.apache.uniffle.server.LocalStorageChecker");
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "s1");
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
assertConf(conf);
Expand All @@ -62,15 +62,15 @@ public void buildInCheckerTest() {
public void checkTest() {
AtomicBoolean healthy = new AtomicBoolean(false);
ShuffleServerConf conf = new ShuffleServerConf();
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, HealthyMockChecker.class.getCanonicalName());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), HealthyMockChecker.class.getCanonicalName());
HealthCheck checker = new HealthCheck(healthy, conf, Lists.newArrayList());
checker.check();
assertTrue(healthy.get());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES, UnHealthyMockChecker.class.getCanonicalName());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), UnHealthyMockChecker.class.getCanonicalName());
checker = new HealthCheck(healthy, conf, Lists.newArrayList());
checker.check();
assertFalse(healthy.get());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES,
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
UnHealthyMockChecker.class.getCanonicalName() + "," + HealthyMockChecker.class.getCanonicalName());
checker = new HealthCheck(healthy, conf, Lists.newArrayList());
checker.check();
Expand Down

0 comments on commit 8256765

Please sign in to comment.