Skip to content

Commit

Permalink
[#1711][FOLLOWUP] feat(coordinator): refactor the reconfigurable conf (
Browse files Browse the repository at this point in the history
…#1741)

### What changes were proposed in this pull request?

Refactor the reconfigurable conf for coordinator side.

### Why are the changes needed?

Follow up: #1711 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests
  • Loading branch information
zuston committed May 29, 2024
1 parent 417674d commit a3a49f0
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 267 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

public class AccessManager implements Reconfigurable {
public class AccessManager {

private static final Logger LOG = LoggerFactory.getLogger(AccessManager.class);

Expand Down Expand Up @@ -110,22 +108,4 @@ public void close() throws IOException {
checker.close();
}
}

public boolean isPropertyReconfigurable(String property) {
for (AccessChecker checker : accessCheckers) {
if (checker instanceof Reconfigurable
&& ((Reconfigurable) checker).isPropertyReconfigurable(property)) {
return true;
}
}
return false;
}

public void reconfigure(RssConf conf) {
for (AccessChecker checker : accessCheckers) {
if (checker instanceof Reconfigurable) {
((Reconfigurable) checker).reconfigure(conf);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.List;
import java.util.Set;

import org.apache.uniffle.common.config.Reconfigurable;

public interface ClusterManager extends Closeable, Reconfigurable {
public interface ClusterManager extends Closeable {

/**
* Add a server to the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
Expand Down Expand Up @@ -55,7 +54,7 @@
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE;

/** The main entrance of coordinator service */
public class CoordinatorServer extends ReconfigurableBase {
public class CoordinatorServer {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);

Expand All @@ -72,7 +71,6 @@ public class CoordinatorServer extends ReconfigurableBase {
private String id;

public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
super(coordinatorConf);
this.coordinatorConf = coordinatorConf;
try {
initialization();
Expand All @@ -91,8 +89,7 @@ public static void main(String[] args) throws Exception {

// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);

coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
ReconfigurableConfManager.init(coordinatorConf, configFile);

// Start the coordinator service
final CoordinatorServer coordinatorServer = new CoordinatorServer(coordinatorConf);
Expand All @@ -102,7 +99,6 @@ public static void main(String[] args) throws Exception {
}

public void start() throws Exception {
startReconfigureThread();
jettyServer.start();
server.start();
if (metricReporter != null) {
Expand Down Expand Up @@ -145,7 +141,6 @@ public void stopServer() throws Exception {
metricReporter.stop();
LOG.info("Metric Reporter Stopped!");
}
stopReconfigureThread();
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
}
Expand Down Expand Up @@ -272,27 +267,4 @@ public GRPCMetrics getGrpcMetrics() {
protected void blockUntilShutdown() throws InterruptedException {
server.blockUntilShutdown();
}

@Override
public void reconfigure(RssConf conf) {
clusterManager.reconfigure(conf);
accessManager.reconfigure(conf);
}

@Override
public boolean isPropertyReconfigurable(String property) {
if (clusterManager.isPropertyReconfigurable(property)) {
return true;
}
if (accessManager.isPropertyReconfigurable(property)) {
return true;
}
return false;
}

@Override
public RssConf reloadConfiguration() {
return new CoordinatorConf(
coordinatorConf.getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
import org.apache.uniffle.client.request.RssDecommissionRequest;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
Expand All @@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager {
private Map<String, Set<ServerNode>> tagToNodes = JavaUtils.newConcurrentMap();
private AtomicLong excludeLastModify = new AtomicLong(0L);
private long heartbeatTimeout;
private volatile int shuffleNodesMax;
private ReconfigurableConfManager.Reconfigurable<Integer> shuffleNodesMax;
private ScheduledExecutorService scheduledExecutorService;
private ScheduledExecutorService checkNodesExecutorService;
private FileSystem hadoopFileSystem;
Expand All @@ -86,7 +86,8 @@ public class SimpleClusterManager implements ClusterManager {
private boolean readyForServe = false;

public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws Exception {
this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.shuffleNodesMax =
ReconfigurableConfManager.register(conf, CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
// the thread for checking if shuffle server report heartbeat in time
scheduledExecutorService =
Expand Down Expand Up @@ -311,7 +312,7 @@ public void clear() {

@Override
public int getShuffleNodesMax() {
return shuffleNodesMax;
return shuffleNodesMax.get();
}

@Override
Expand Down Expand Up @@ -392,18 +393,4 @@ public void setStartupSilentPeriodEnabled(boolean startupSilentPeriodEnabled) {
public Map<String, ServerNode> getServers() {
return servers;
}

@Override
public void reconfigure(RssConf conf) {
int nodeMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
if (nodeMax != shuffleNodesMax) {
LOG.warn("Coordinator update new shuffleNodesMax {}", nodeMax);
shuffleNodesMax = nodeMax;
}
}

@Override
public boolean isPropertyReconfigurable(String property) {
return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ClusterManager;
Expand All @@ -42,15 +41,16 @@
* AccessClusterLoadChecker use the cluster load metrics including memory and healthy to filter and
* count available nodes numbers and reject if the number do not reach the threshold.
*/
public class AccessClusterLoadChecker extends AbstractAccessChecker implements Reconfigurable {
public class AccessClusterLoadChecker extends AbstractAccessChecker {

private static final Logger LOG = LoggerFactory.getLogger(AccessClusterLoadChecker.class);

private final ClusterManager clusterManager;
private final double memoryPercentThreshold;
// The hard constraint number of available shuffle servers
private final int availableServerNumThreshold;
private volatile int defaultRequiredShuffleServerNumber;
private volatile ReconfigurableConfManager.Reconfigurable<Integer>
defaultRequiredShuffleServerNumber;

public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
super(accessManager);
Expand All @@ -61,7 +61,7 @@ public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
this.availableServerNumThreshold =
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD, -1);
this.defaultRequiredShuffleServerNumber =
conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
ReconfigurableConfManager.register(conf, CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
}

@Override
Expand All @@ -85,7 +85,7 @@ public AccessCheckResult check(AccessInfo accessInfo) {
if (availableServerNumThreshold == -1) {
String requiredNodesNumRaw =
accessInfo.getExtraProperties().get(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM);
int requiredNodesNum = defaultRequiredShuffleServerNumber;
int requiredNodesNum = defaultRequiredShuffleServerNumber.get();
if (StringUtils.isNotEmpty(requiredNodesNumRaw)
&& Integer.parseInt(requiredNodesNumRaw) > 0) {
requiredNodesNum = Integer.parseInt(requiredNodesNumRaw);
Expand Down Expand Up @@ -126,18 +126,4 @@ public int getAvailableServerNumThreshold() {
}

public void close() {}

@Override
public void reconfigure(RssConf conf) {
int nodeMax = conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
if (nodeMax != defaultRequiredShuffleServerNumber) {
LOG.warn("Coordinator update new defaultRequiredShuffleServerNumber {}.", nodeMax);
defaultRequiredShuffleServerNumber = nodeMax;
}
}

@Override
public boolean isPropertyReconfigurable(String property) {
return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
Expand Down Expand Up @@ -50,8 +50,7 @@ public static void main(String[] args) throws Exception {

// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);

coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
ReconfigurableConfManager.init(coordinatorConf, configFile);

// Start the coordinator service
final CoordinatorTestServer coordinatorServer = new CoordinatorTestServer(coordinatorConf);
Expand Down
Loading

0 comments on commit a3a49f0

Please sign in to comment.