Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] move state listener out of GlobalStateMgr, and rename it to state change executor #8747

Merged
merged 11 commits into from Jul 20, 2022
13 changes: 12 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java
Expand Up @@ -29,6 +29,7 @@
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.Version;
import com.starrocks.common.util.JdkUtils;
import com.starrocks.ha.StateChangeExecutor;
import com.starrocks.http.HttpServer;
import com.starrocks.journal.Journal;
import com.starrocks.journal.bdbje.BDBEnvironment;
Expand Down Expand Up @@ -108,8 +109,18 @@ public static void start(String starRocksDir, String pidDir, String[] args) {
FrontendOptions.init(args);
ExecuteEnv.setup();

// init globalStateMgr and wait it be ready
// init globalStateMgr
GlobalStateMgr.getCurrentState().initialize(args);
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved

StateChangeExecutor.getInstance().setMetaContext(
GlobalStateMgr.getCurrentState().getMetaContext());

StateChangeExecutor.getInstance().registerStateChangeExecution(
GlobalStateMgr.getCurrentState().getStateChangeExecution());
// start state change executor
StateChangeExecutor.getInstance().start();

// wait globalStateMgr to be ready
GlobalStateMgr.getCurrentState().waitForReady();

FrontendOptions.saveStartType();
Expand Down
Expand Up @@ -26,7 +26,6 @@
import com.sleepycat.je.rep.StateChangeListener;
import com.starrocks.common.util.Util;
import com.starrocks.persist.EditLog;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -70,7 +69,7 @@ public synchronized void stateChange(StateChangeEvent sce) throws RuntimeExcepti
}
}
Preconditions.checkNotNull(newType);
GlobalStateMgr.getCurrentState().notifyNewFETypeTransfer(newType);
StateChangeExecutor.getInstance().notifyNewFETypeTransfer(newType);
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
}

}
@@ -0,0 +1,8 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.

package com.starrocks.ha;

public interface StateChangeExecution {
public void transferToLeader();
public void transferToNonLeader(FrontendNodeType newType);
}
165 changes: 165 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecutor.java
@@ -0,0 +1,165 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.

package com.starrocks.ha;

import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.starrocks.common.util.Daemon;
import com.starrocks.common.util.Util;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

public class StateChangeExecutor extends Daemon {
private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100;
private static final Logger LOG = LogManager.getLogger(StateChangeExecutor.class);

private BlockingQueue<FrontendNodeType> typeTransferQueue;
private List<StateChangeExecution> executions;

private static class SingletonHolder {
private static final StateChangeExecutor INSTANCE = new StateChangeExecutor();
}

public static StateChangeExecutor getInstance() {
return SingletonHolder.INSTANCE;
}

public StateChangeExecutor() {
super("stateChangeExecutor", STATE_CHANGE_CHECK_INTERVAL_MS);
typeTransferQueue = Queues.newLinkedBlockingDeque();
executions = new ArrayList<>();
}

public void registerStateChangeExecution(StateChangeExecution execution) {
executions.add(execution);
}

public void notifyNewFETypeTransfer(FrontendNodeType newType) {
try {
String msg = "notify new FE type transfer: " + newType;
LOG.warn(msg);
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
Util.stdoutWithTime(msg);
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
typeTransferQueue.put(newType);
} catch (InterruptedException e) {
LOG.error("failed to put new FE type: {}, {}.", newType, e);
Thread.currentThread().interrupt();
}
}

@Override
protected void runOneCycle() {
while (true) {
FrontendNodeType newType = null;
try {
newType = typeTransferQueue.take();
} catch (InterruptedException e) {
LOG.error("got exception when take FE type from queue", e);
Thread.currentThread().interrupt();
Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage());
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
System.exit(-1);
}
Preconditions.checkNotNull(newType);
FrontendNodeType feType = GlobalStateMgr.getCurrentState().getFeType();
LOG.info("begin to transfer FE type from {} to {}", feType, newType);
if (feType == newType) {
return;
starrocks-xupeng marked this conversation as resolved.
Show resolved Hide resolved
}

/*
* INIT -> LEADER: transferToLeader
* INIT -> FOLLOWER/OBSERVER: transferToNonLeader
* UNKNOWN -> LEADER: transferToLeader
* UNKNOWN -> FOLLOWER/OBSERVER: transferToNonLeader
* FOLLOWER -> LEADER: transferToLeader
* FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false
*/
switch (feType) {
case INIT: {
switch (newType) {
case LEADER: {
for (StateChangeExecution execution : executions) {
execution.transferToLeader();
}
break;
}
case FOLLOWER:
case OBSERVER: {
for (StateChangeExecution execution : executions) {
execution.transferToNonLeader(newType);
}
break;
}
case UNKNOWN:
break;
default:
break;
}
break;
}
case UNKNOWN: {
switch (newType) {
case LEADER: {
for (StateChangeExecution execution : executions) {
execution.transferToLeader();
}
break;
}
case FOLLOWER:
case OBSERVER: {
for (StateChangeExecution execution : executions) {
execution.transferToNonLeader(newType);
}
break;
}
default:
break;
}
break;
}
case FOLLOWER: {
switch (newType) {
case LEADER: {
for (StateChangeExecution execution : executions) {
execution.transferToLeader();
}
break;
}
case UNKNOWN: {
for (StateChangeExecution execution : executions) {
execution.transferToNonLeader(newType);
}
break;
}
default:
break;
}
break;
}
case OBSERVER: {
if (newType == FrontendNodeType.UNKNOWN) {
for (StateChangeExecution execution : executions) {
execution.transferToNonLeader(newType);
}
}
break;
}
case LEADER: {
// exit if leader changed to any other type
String msg = "transfer FE type from LEADER to " + newType.name() + ". exit";
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
}
default:
break;
} // end switch formerFeType

LOG.info("finished to transfer FE type to {}", feType);
}
} // end runOneCycle
}