Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1711][FOLLOWUP] feat(coordinator): refactor the reconfigurable conf #1741

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

public class AccessManager implements Reconfigurable {
public class AccessManager {

private static final Logger LOG = LoggerFactory.getLogger(AccessManager.class);

Expand Down Expand Up @@ -110,22 +108,4 @@ public void close() throws IOException {
checker.close();
}
}

public boolean isPropertyReconfigurable(String property) {
for (AccessChecker checker : accessCheckers) {
if (checker instanceof Reconfigurable
&& ((Reconfigurable) checker).isPropertyReconfigurable(property)) {
return true;
}
}
return false;
}

public void reconfigure(RssConf conf) {
for (AccessChecker checker : accessCheckers) {
if (checker instanceof Reconfigurable) {
((Reconfigurable) checker).reconfigure(conf);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.List;
import java.util.Set;

import org.apache.uniffle.common.config.Reconfigurable;

public interface ClusterManager extends Closeable, Reconfigurable {
public interface ClusterManager extends Closeable {

/**
* Add a server to the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
Expand Down Expand Up @@ -55,7 +54,7 @@
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE;

/** The main entrance of coordinator service */
public class CoordinatorServer extends ReconfigurableBase {
public class CoordinatorServer {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);

Expand All @@ -72,7 +71,6 @@ public class CoordinatorServer extends ReconfigurableBase {
private String id;

public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
super(coordinatorConf);
this.coordinatorConf = coordinatorConf;
try {
initialization();
Expand All @@ -91,8 +89,7 @@ public static void main(String[] args) throws Exception {

// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);

coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
ReconfigurableConfManager.init(coordinatorConf, configFile);

// Start the coordinator service
final CoordinatorServer coordinatorServer = new CoordinatorServer(coordinatorConf);
Expand All @@ -102,7 +99,6 @@ public static void main(String[] args) throws Exception {
}

public void start() throws Exception {
startReconfigureThread();
jettyServer.start();
server.start();
if (metricReporter != null) {
Expand Down Expand Up @@ -145,7 +141,6 @@ public void stopServer() throws Exception {
metricReporter.stop();
LOG.info("Metric Reporter Stopped!");
}
stopReconfigureThread();
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
}
Expand Down Expand Up @@ -272,27 +267,4 @@ public GRPCMetrics getGrpcMetrics() {
protected void blockUntilShutdown() throws InterruptedException {
server.blockUntilShutdown();
}

@Override
public void reconfigure(RssConf conf) {
clusterManager.reconfigure(conf);
accessManager.reconfigure(conf);
}

@Override
public boolean isPropertyReconfigurable(String property) {
if (clusterManager.isPropertyReconfigurable(property)) {
return true;
}
if (accessManager.isPropertyReconfigurable(property)) {
return true;
}
return false;
}

@Override
public RssConf reloadConfiguration() {
return new CoordinatorConf(
coordinatorConf.getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
import org.apache.uniffle.client.request.RssDecommissionRequest;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
Expand All @@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager {
private Map<String, Set<ServerNode>> tagToNodes = JavaUtils.newConcurrentMap();
private AtomicLong excludeLastModify = new AtomicLong(0L);
private long heartbeatTimeout;
private volatile int shuffleNodesMax;
private ReconfigurableConfManager.Reconfigurable<Integer> shuffleNodesMax;
private ScheduledExecutorService scheduledExecutorService;
private ScheduledExecutorService checkNodesExecutorService;
private FileSystem hadoopFileSystem;
Expand All @@ -86,7 +86,8 @@ public class SimpleClusterManager implements ClusterManager {
private boolean readyForServe = false;

public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws Exception {
this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.shuffleNodesMax =
ReconfigurableConfManager.register(conf, CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
// the thread for checking if shuffle server report heartbeat in time
scheduledExecutorService =
Expand Down Expand Up @@ -311,7 +312,7 @@ public void clear() {

@Override
public int getShuffleNodesMax() {
return shuffleNodesMax;
return shuffleNodesMax.get();
}

@Override
Expand Down Expand Up @@ -392,18 +393,4 @@ public void setStartupSilentPeriodEnabled(boolean startupSilentPeriodEnabled) {
public Map<String, ServerNode> getServers() {
return servers;
}

@Override
public void reconfigure(RssConf conf) {
int nodeMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
if (nodeMax != shuffleNodesMax) {
LOG.warn("Coordinator update new shuffleNodesMax {}", nodeMax);
shuffleNodesMax = nodeMax;
}
}

@Override
public boolean isPropertyReconfigurable(String property) {
return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ClusterManager;
Expand All @@ -42,15 +41,16 @@
* AccessClusterLoadChecker use the cluster load metrics including memory and healthy to filter and
* count available nodes numbers and reject if the number do not reach the threshold.
*/
public class AccessClusterLoadChecker extends AbstractAccessChecker implements Reconfigurable {
public class AccessClusterLoadChecker extends AbstractAccessChecker {

private static final Logger LOG = LoggerFactory.getLogger(AccessClusterLoadChecker.class);

private final ClusterManager clusterManager;
private final double memoryPercentThreshold;
// The hard constraint number of available shuffle servers
private final int availableServerNumThreshold;
private volatile int defaultRequiredShuffleServerNumber;
private volatile ReconfigurableConfManager.Reconfigurable<Integer>
defaultRequiredShuffleServerNumber;

public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
super(accessManager);
Expand All @@ -61,7 +61,7 @@ public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
this.availableServerNumThreshold =
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD, -1);
this.defaultRequiredShuffleServerNumber =
conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
ReconfigurableConfManager.register(conf, CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
}

@Override
Expand All @@ -85,7 +85,7 @@ public AccessCheckResult check(AccessInfo accessInfo) {
if (availableServerNumThreshold == -1) {
String requiredNodesNumRaw =
accessInfo.getExtraProperties().get(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM);
int requiredNodesNum = defaultRequiredShuffleServerNumber;
int requiredNodesNum = defaultRequiredShuffleServerNumber.get();
if (StringUtils.isNotEmpty(requiredNodesNumRaw)
&& Integer.parseInt(requiredNodesNumRaw) > 0) {
requiredNodesNum = Integer.parseInt(requiredNodesNumRaw);
Expand Down Expand Up @@ -126,18 +126,4 @@ public int getAvailableServerNumThreshold() {
}

public void close() {}

@Override
public void reconfigure(RssConf conf) {
int nodeMax = conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
if (nodeMax != defaultRequiredShuffleServerNumber) {
LOG.warn("Coordinator update new defaultRequiredShuffleServerNumber {}.", nodeMax);
defaultRequiredShuffleServerNumber = nodeMax;
}
}

@Override
public boolean isPropertyReconfigurable(String property) {
return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
Expand Down Expand Up @@ -50,8 +50,7 @@ public static void main(String[] args) throws Exception {

// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);

coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
ReconfigurableConfManager.init(coordinatorConf, configFile);

// Start the coordinator service
final CoordinatorTestServer coordinatorServer = new CoordinatorTestServer(coordinatorConf);
Expand Down
Loading
Loading