Skip to content

Commit

Permalink
APPNG-2481
Browse files Browse the repository at this point in the history
  • Loading branch information
madness-inc committed Jun 21, 2023
1 parent 1d8bfcf commit cd67200
Showing 1 changed file with 62 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,66 +53,82 @@ public void perform(Environment env, Site site) throws InvalidConfigurationExcep
if (isTargetNode(env)) {
logger.info("about to start site: {}", getSiteName());
FieldProcessor fp = new FieldProcessorImpl("start");
waitForClusterState(env, site, logger);
// check if site is present
// if not, it's a new site that has not been loaded on the current node
if (null != site) {
waitForClusterState(env, site, logger);
}
SiteImpl siteByName = getPlatformContext(env).getBean(CoreService.class).getSiteByName(getSiteName());
if (null == siteByName) {
throw new InvalidConfigurationException(null, null,
String.format("Site to load not found: %s", getSiteName()));
}
getInitializerService(env).loadSite(env, siteByName, false, fp, false);
} else {
logIgnoreMessage(logger);
}
}

public void waitForClusterState(Environment env, Site site, Logger logger) {
Properties cfg = env.getAttribute(Scope.PLATFORM, Platform.Environment.PLATFORM_CONFIG);
Integer maxReloadDelay = cfg.getInteger(Platform.Property.SITE_RELOAD_MAX_RANDOM_DELAY,
DEFAULT_MAX_RELOAD_DELAY);
long delayMillis = maxReloadDelay + (long) (Math.random() * maxReloadDelay);
try {
logger.info("Waiting {}ms before reloading site {} on node {}", delayMillis, site.getName(),
Messaging.getNodeId());
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
//
}
if (cfg.getBoolean("waitForSitesStarted", true)) {
String nodeId = Messaging.getNodeId();
Map<String, NodeState> nodeStates = NodeEvent.clusterState(env, nodeId);
int numNodes = nodeStates.size();
int minActiveNodes = numNodes > 3 ? (numNodes + 1) / 2 : numNodes;
int waited = 0;
int waitTime = cfg.getInteger("waitForSitesStartedWaitTime", 5);
int maxWaittime = cfg.getInteger("waitForSitesStartedMaxWaitTime", 30);
int activeNodes;
String nodeId = Messaging.getNodeId();
Map<String, NodeState> nodeStates = NodeEvent.clusterState(env, nodeId);
int numNodes = nodeStates.size();

if (numNodes > 1) {
Properties cfg = env.getAttribute(Scope.PLATFORM, Platform.Environment.PLATFORM_CONFIG);
Integer maxReloadDelay = cfg.getInteger(Platform.Property.SITE_RELOAD_MAX_RANDOM_DELAY,
DEFAULT_MAX_RELOAD_DELAY);
long delayMillis = maxReloadDelay + (long) (Math.random() * maxReloadDelay);
try {
logger.info("Waiting {}ms before reloading site {} on node {}", delayMillis, site.getName(), nodeId);
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
//
}

if (cfg.getBoolean("waitForSitesStarted", true)) {
int minActiveNodes = numNodes > 3 ? ((numNodes + 1) / 2) + 1 : numNodes;
int waited = 0;
int waitTime = cfg.getInteger("waitForSitesStartedWaitTime", 5);
int maxWaittime = cfg.getInteger("waitForSitesStartedMaxWaitTime", 30);
int activeNodes;

logger.info("Site {} must be {} on {} of {} nodes before reloading.", site.getName(), SiteState.STARTED,
minActiveNodes, numNodes);
logger.info("Site {} must be {} on {} of {} nodes before reloading.", site.getName(), SiteState.STARTED,
minActiveNodes, numNodes);

do {
activeNodes = 0;
for (Entry<String, NodeState> state : nodeStates.entrySet()) {
String otherNode = state.getKey();
SiteState siteState = state.getValue().getSiteStates().get(site.getName());
if (SiteState.STARTED.equals(siteState)) {
activeNodes++;
do {
activeNodes = 0;
for (Entry<String, NodeState> state : nodeStates.entrySet()) {
String otherNode = state.getKey();
SiteState siteState = state.getValue().getSiteStates().get(site.getName());
if (SiteState.STARTED.equals(siteState)) {
activeNodes++;
}
logger.debug("Site {} is {} on node {}", site.getName(), siteState, otherNode);
}
logger.debug("Site {} is {} on node {}", site.getName(), siteState, otherNode);
}
if (activeNodes < minActiveNodes) {
try {
logger.info("Site {} is active on {} of {} nodes, waiting {}s for site to start on {} nodes.",
site.getName(), activeNodes, numNodes, waitTime, minActiveNodes - activeNodes);
waited += waitTime;
Thread.sleep(TimeUnit.SECONDS.toMillis(waitTime));
} catch (InterruptedException e) {
//
if (activeNodes < minActiveNodes) {
try {
logger.info(
"Site {} is active on {} of {} nodes, waiting {}s for site to start on {} nodes.",
site.getName(), activeNodes, numNodes, waitTime, minActiveNodes - activeNodes);
waited += waitTime;
Thread.sleep(TimeUnit.SECONDS.toMillis(waitTime));
} catch (InterruptedException e) {
//
}
}
} while (activeNodes < minActiveNodes && waited < maxWaittime);
if (waited >= maxWaittime) {
logger.info("Reached maximum waiting time of {}s, now reloading site {}.", maxWaittime,
site.getName());
} else {
logger.info("Site {} is active on {} of {} nodes, reloading now.", site.getName(), activeNodes,
numNodes);
}
} while (activeNodes < minActiveNodes && waited < maxWaittime);
if (waited >= maxWaittime) {
logger.info("Reached maximum waiting time of {}s, now reloading site {}.", maxWaittime, site.getName());
} else {
logger.info("Site {} is active on {} of {} nodes, reloading now.", site.getName(), activeNodes,
numNodes);
}
} else {
logger.info("Node {} is single-instance, no need to wait for other cluster members. Now reloading site {}",
nodeId, site.getName());
}
}

Expand Down

0 comments on commit cd67200

Please sign in to comment.