diff --git a/fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java b/fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java index 46699356c1211..f2a553f9e68c1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java +++ b/fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java @@ -29,6 +29,7 @@ import com.starrocks.common.ThreadPoolManager; import com.starrocks.common.Version; import com.starrocks.common.util.JdkUtils; +import com.starrocks.ha.StateChangeExecutor; import com.starrocks.http.HttpServer; import com.starrocks.journal.Journal; import com.starrocks.journal.bdbje.BDBEnvironment; @@ -108,8 +109,18 @@ public static void start(String starRocksDir, String pidDir, String[] args) { FrontendOptions.init(args); ExecuteEnv.setup(); - // init globalStateMgr and wait it be ready + // init globalStateMgr GlobalStateMgr.getCurrentState().initialize(args); + + StateChangeExecutor.getInstance().setMetaContext( + GlobalStateMgr.getCurrentState().getMetaContext()); + + StateChangeExecutor.getInstance().registerStateChangeExecution( + GlobalStateMgr.getCurrentState().getStateChangeExecution()); + // start state change executor + StateChangeExecutor.getInstance().start(); + + // wait globalStateMgr to be ready GlobalStateMgr.getCurrentState().waitForReady(); FrontendOptions.saveStartType(); diff --git a/fe/fe-core/src/main/java/com/starrocks/ha/BDBStateChangeListener.java b/fe/fe-core/src/main/java/com/starrocks/ha/BDBStateChangeListener.java index 603943d75b652..bc67c4cf57a90 100644 --- a/fe/fe-core/src/main/java/com/starrocks/ha/BDBStateChangeListener.java +++ b/fe/fe-core/src/main/java/com/starrocks/ha/BDBStateChangeListener.java @@ -26,7 +26,6 @@ import com.sleepycat.je.rep.StateChangeListener; import com.starrocks.common.util.Util; import com.starrocks.persist.EditLog; -import com.starrocks.server.GlobalStateMgr; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,7 +69,7 @@ public synchronized void stateChange(StateChangeEvent sce) throws RuntimeExcepti } } Preconditions.checkNotNull(newType); - GlobalStateMgr.getCurrentState().notifyNewFETypeTransfer(newType); + StateChangeExecutor.getInstance().notifyNewFETypeTransfer(newType); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecution.java b/fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecution.java new file mode 100644 index 0000000000000..3cac282d6f01b --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecution.java @@ -0,0 +1,8 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +package com.starrocks.ha; + +public interface StateChangeExecution { + public void transferToLeader(); + public void transferToNonLeader(FrontendNodeType newType); +} diff --git a/fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecutor.java b/fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecutor.java new file mode 100644 index 0000000000000..27908404393b3 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/ha/StateChangeExecutor.java @@ -0,0 +1,165 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +package com.starrocks.ha; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.starrocks.common.util.Daemon; +import com.starrocks.common.util.Util; +import com.starrocks.server.GlobalStateMgr; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public class StateChangeExecutor extends Daemon { + private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100; + private static final Logger LOG = LogManager.getLogger(StateChangeExecutor.class); + + private BlockingQueue typeTransferQueue; + private List executions; + + private static class SingletonHolder { + private static final StateChangeExecutor INSTANCE = new StateChangeExecutor(); + } + + public static StateChangeExecutor getInstance() { + return SingletonHolder.INSTANCE; + } + + public StateChangeExecutor() { + super("stateChangeExecutor", STATE_CHANGE_CHECK_INTERVAL_MS); + typeTransferQueue = Queues.newLinkedBlockingDeque(); + executions = new ArrayList<>(); + } + + public void registerStateChangeExecution(StateChangeExecution execution) { + executions.add(execution); + } + + public void notifyNewFETypeTransfer(FrontendNodeType newType) { + try { + String msg = "notify new FE type transfer: " + newType; + LOG.warn(msg); + Util.stdoutWithTime(msg); + typeTransferQueue.put(newType); + } catch (InterruptedException e) { + LOG.error("failed to put new FE type: {}, {}.", newType, e); + Thread.currentThread().interrupt(); + } + } + + @Override + protected void runOneCycle() { + while (true) { + FrontendNodeType newType = null; + try { + newType = typeTransferQueue.take(); + } catch (InterruptedException e) { + LOG.error("got exception when take FE type from queue", e); + Thread.currentThread().interrupt(); + Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage()); + System.exit(-1); + } + Preconditions.checkNotNull(newType); + FrontendNodeType feType = GlobalStateMgr.getCurrentState().getFeType(); + LOG.info("begin to transfer FE type from {} to {}", feType, newType); + if (feType == newType) { + return; + } + + /* + * INIT -> LEADER: transferToLeader + * INIT -> FOLLOWER/OBSERVER: transferToNonLeader + * UNKNOWN -> LEADER: transferToLeader + * UNKNOWN -> FOLLOWER/OBSERVER: transferToNonLeader + * FOLLOWER -> LEADER: transferToLeader + * FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false + */ + switch (feType) { + case INIT: { + switch (newType) { + case LEADER: { + for (StateChangeExecution execution : executions) { + execution.transferToLeader(); + } + break; + } + case FOLLOWER: + case OBSERVER: { + for (StateChangeExecution execution : executions) { + execution.transferToNonLeader(newType); + } + break; + } + case UNKNOWN: + break; + default: + break; + } + break; + } + case UNKNOWN: { + switch (newType) { + case LEADER: { + for (StateChangeExecution execution : executions) { + execution.transferToLeader(); + } + break; + } + case FOLLOWER: + case OBSERVER: { + for (StateChangeExecution execution : executions) { + execution.transferToNonLeader(newType); + } + break; + } + default: + break; + } + break; + } + case FOLLOWER: { + switch (newType) { + case LEADER: { + for (StateChangeExecution execution : executions) { + execution.transferToLeader(); + } + break; + } + case UNKNOWN: { + for (StateChangeExecution execution : executions) { + execution.transferToNonLeader(newType); + } + break; + } + default: + break; + } + break; + } + case OBSERVER: { + if (newType == FrontendNodeType.UNKNOWN) { + for (StateChangeExecution execution : executions) { + execution.transferToNonLeader(newType); + } + } + break; + } + case LEADER: { + // exit if leader changed to any other type + String msg = "transfer FE type from LEADER to " + newType.name() + ". exit"; + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); + } + default: + break; + } // end switch formerFeType + + LOG.info("finished to transfer FE type to {}", feType); + } + } // end runOneCycle +} diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index cee4d0b1cad58..5027dc9060382 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -27,7 +27,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Queues; import com.google.common.collect.Range; import com.starrocks.alter.Alter; import com.starrocks.alter.AlterJob; @@ -153,6 +152,7 @@ import com.starrocks.ha.FrontendNodeType; import com.starrocks.ha.HAProtocol; import com.starrocks.ha.LeaderInfo; +import com.starrocks.ha.StateChangeExecution; import com.starrocks.journal.Journal; import com.starrocks.journal.JournalCursor; import com.starrocks.journal.JournalEntity; @@ -268,7 +268,6 @@ public class GlobalStateMgr { private static final Logger LOG = LogManager.getLogger(GlobalStateMgr.class); // 0 ~ 9999 used for qe public static final long NEXT_ID_INIT_VALUE = 10000; - private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100; private static final int REPLAY_INTERVAL_MS = 1; private static final String IMAGE_DIR = "/image"; @@ -303,7 +302,6 @@ public class GlobalStateMgr { private JournalWriter journalWriter; // master only: write journal log private Daemon replayer; private Daemon timePrinter; - private Daemon listener; private EsRepository esRepository; // it is a daemon, so add it here private StarRocksRepository starRocksRepository; private HiveRepository hiveRepository; @@ -317,7 +315,6 @@ public class GlobalStateMgr { // canRead can be true even if isReady is false. // for example: OBSERVER transfer to UNKNOWN, then isReady will be set to false, but canRead can still be true private AtomicBoolean canRead = new AtomicBoolean(false); - private BlockingQueue typeTransferQueue; // false if default_cluster is not created. private boolean isDefaultClusterCreated = false; @@ -414,6 +411,8 @@ public class GlobalStateMgr { private ShardManager shardManager; + private StateChangeExecution execution; + public List getFrontends(FrontendNodeType nodeType) { return nodeMgr.getFrontends(nodeType); } @@ -499,7 +498,6 @@ private GlobalStateMgr(boolean isCheckpointCatalog) { this.replayedJournalId = new AtomicLong(0L); this.synchronizedTimeMs = 0; this.feType = FrontendNodeType.INIT; - this.typeTransferQueue = Queues.newLinkedBlockingDeque(); this.journalObservable = new JournalObservable(); @@ -574,6 +572,19 @@ private GlobalStateMgr(boolean isCheckpointCatalog) { this.taskManager = new TaskManager(); this.insertOverwriteJobManager = new InsertOverwriteJobManager(); this.shardManager = new ShardManager(); + + GlobalStateMgr gsm = this; + this.execution = new StateChangeExecution() { + @Override + public void transferToLeader() { + gsm.transferToLeader(); + } + + @Override + public void transferToNonLeader(FrontendNodeType newType) { + gsm.transferToNonLeader(newType); + } + }; } public static void destroyCheckpoint() { @@ -834,10 +845,6 @@ public void initialize(String[] args) throws Exception { // 6. start task cleaner thread createTaskCleaner(); - - // 7. start state listener thread - createStateListener(); - listener.start(); } protected void initJournal() throws JournalException, InterruptedException { @@ -878,7 +885,8 @@ public static String genFeNodeName(String host, int port, boolean isOldStyle) { } } - private void transferToLeader(FrontendNodeType oldType) { + private void transferToLeader() { + FrontendNodeType oldType = feType; // stop replayer if (replayer != null) { replayer.exit(); @@ -1062,6 +1070,7 @@ private void transferToNonLeader(FrontendNodeType newType) { // not set canRead here, leave canRead as what is was. // if meta out of date, canRead will be set to false in replayer thread. metaReplayState.setTransferToUnknown(); + feType = newType; return; } @@ -1086,6 +1095,8 @@ private void transferToNonLeader(FrontendNodeType newType) { startNonMasterDaemonThreads(); MetricRepo.init(); + + feType = newType; } public void loadImage(String imageDir) throws IOException, DdlException { @@ -1578,121 +1589,6 @@ private void setCanRead(boolean hasLog, boolean err) { } } - public void notifyNewFETypeTransfer(FrontendNodeType newType) { - try { - String msg = "notify new FE type transfer: " + newType; - LOG.warn(msg); - Util.stdoutWithTime(msg); - this.typeTransferQueue.put(newType); - } catch (InterruptedException e) { - LOG.error("failed to put new FE type: {}", newType, e); - } - } - - public void createStateListener() { - listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) { - @Override - protected synchronized void runOneCycle() { - - while (true) { - FrontendNodeType newType = null; - try { - newType = typeTransferQueue.take(); - } catch (InterruptedException e) { - LOG.error("got exception when take FE type from queue", e); - Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage()); - System.exit(-1); - } - Preconditions.checkNotNull(newType); - LOG.info("begin to transfer FE type from {} to {}", feType, newType); - if (feType == newType) { - return; - } - - /* - * INIT -> MASTER: transferToLeader - * INIT -> FOLLOWER/OBSERVER: transferToNonLeader - * UNKNOWN -> MASTER: transferToLeader - * UNKNOWN -> FOLLOWER/OBSERVER: transferToNonLeader - * FOLLOWER -> MASTER: transferToLeader - * FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false - */ - switch (feType) { - case INIT: { - switch (newType) { - case LEADER: { - transferToLeader(feType); - break; - } - case FOLLOWER: - case OBSERVER: { - transferToNonLeader(newType); - break; - } - case UNKNOWN: - break; - default: - break; - } - break; - } - case UNKNOWN: { - switch (newType) { - case LEADER: { - transferToLeader(feType); - break; - } - case FOLLOWER: - case OBSERVER: { - transferToNonLeader(newType); - break; - } - default: - break; - } - break; - } - case FOLLOWER: { - switch (newType) { - case LEADER: { - transferToLeader(feType); - break; - } - case UNKNOWN: { - transferToNonLeader(newType); - break; - } - default: - break; - } - break; - } - case OBSERVER: { - if (newType == FrontendNodeType.UNKNOWN) { - transferToNonLeader(newType); - } - break; - } - case LEADER: { - // exit if master changed to any other type - String msg = "transfer FE type from LEADER to " + newType.name() + ". exit"; - LOG.error(msg); - Util.stdoutWithTime(msg); - System.exit(-1); - } - default: - break; - } // end switch formerFeType - - feType = newType; - LOG.info("finished to transfer FE type to {}", feType); - } - } // end runOneCycle - }; - - listener.setMetaContext(metaContext); - } - public synchronized boolean replayJournal(long toJournalId) throws JournalException { long newToJournalId = toJournalId; if (newToJournalId == -1) { @@ -3195,4 +3091,12 @@ public void doTaskBackgroundJob() { LOG.warn("task manager clean expire task runs history failed", t); } } + + public StateChangeExecution getStateChangeExecution() { + return execution; + } + + public MetaContext getMetaContext() { + return metaContext; + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/ha/StateChangeExecutorTest.java b/fe/fe-core/src/test/java/com/starrocks/ha/StateChangeExecutorTest.java new file mode 100644 index 0000000000000..654399498c8c4 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/ha/StateChangeExecutorTest.java @@ -0,0 +1,103 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +package com.starrocks.ha; + +import com.starrocks.server.GlobalStateMgr; +import mockit.Mock; +import mockit.MockUp; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StateChangeExecutorTest { + private class StateChangeExecutionTest implements StateChangeExecution { + private FrontendNodeType type; + @Override + public void transferToLeader() { + type = FrontendNodeType.LEADER; + } + @Override + public void transferToNonLeader(FrontendNodeType newType) { + type = newType; + } + public FrontendNodeType getType() { + return type; + } + public void setType(FrontendNodeType newType) { + type = newType; + } + } + + private StateChangeExecutor executor = null; + + @Before + public void init() { + executor = new StateChangeExecutor(); + } + + @After + public void cleanup() { + executor.exit(); + } + + private void notifyAndCheck(FrontendNodeType newType, StateChangeExecutionTest execution) { + executor.notifyNewFETypeTransfer(newType); + int i = 0; + for (; i < 4; ++i) { + try { + Thread.sleep(500 /* 0.5 second */); + } catch (InterruptedException e) { + } + if (execution.getType() == newType) { + break; + } + } + if (i != 4) { // it's possible that consumer thread is too slow + Assert.assertEquals(newType, execution.getType()); + } + } + + @Test + public void testStateChangeExecutor() { + StateChangeExecutionTest execution = new StateChangeExecutionTest(); + executor.registerStateChangeExecution(execution); + + executor.start(); + + new MockUp() { + @Mock + public FrontendNodeType getFeType() { + return execution.getType(); + } + }; + // INIT -> LEADER + execution.setType(FrontendNodeType.INIT); + Assert.assertEquals(FrontendNodeType.INIT, execution.getType()); + notifyAndCheck(FrontendNodeType.LEADER, execution); + + // INIT -> FOLLOWER + execution.setType(FrontendNodeType.INIT); + notifyAndCheck(FrontendNodeType.FOLLOWER, execution); + + // UNKNOWN -> LEADER + execution.setType(FrontendNodeType.UNKNOWN); + notifyAndCheck(FrontendNodeType.LEADER, execution); + + // UNKNOWN -> FOLLOWER + execution.setType(FrontendNodeType.UNKNOWN); + notifyAndCheck(FrontendNodeType.FOLLOWER, execution); + + // FOLLOWER -> LEADER + execution.setType(FrontendNodeType.FOLLOWER); + notifyAndCheck(FrontendNodeType.LEADER, execution); + + // FOLLOWER -> UNKNOWN + execution.setType(FrontendNodeType.FOLLOWER); + notifyAndCheck(FrontendNodeType.UNKNOWN, execution); + + // OBSERVER -> UNKNOWN + execution.setType(FrontendNodeType.OBSERVER); + notifyAndCheck(FrontendNodeType.UNKNOWN, execution); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/utframe/MockedFrontend.java b/fe/fe-core/src/test/java/com/starrocks/utframe/MockedFrontend.java index b9accf7bb9bf6..9000a182eeda6 100644 --- a/fe/fe-core/src/test/java/com/starrocks/utframe/MockedFrontend.java +++ b/fe/fe-core/src/test/java/com/starrocks/utframe/MockedFrontend.java @@ -28,6 +28,8 @@ import com.starrocks.common.util.NetUtils; import com.starrocks.common.util.PrintableMap; import com.starrocks.ha.FrontendNodeType; +import com.starrocks.ha.StateChangeExecution; +import com.starrocks.ha.StateChangeExecutor; import com.starrocks.journal.Journal; import com.starrocks.journal.JournalException; import com.starrocks.journal.JournalFactory; @@ -243,7 +245,13 @@ public boolean isPortUsing(String host, int port) { }; GlobalStateMgr.getCurrentState().initialize(args); - GlobalStateMgr.getCurrentState().notifyNewFETypeTransfer(FrontendNodeType.LEADER); + StateChangeExecutor.getInstance().setMetaContext( + GlobalStateMgr.getCurrentState().getMetaContext()); + StateChangeExecutor.getInstance().registerStateChangeExecution( + GlobalStateMgr.getCurrentState().getStateChangeExecution()); + StateChangeExecutor.getInstance().start(); + StateChangeExecutor.getInstance().notifyNewFETypeTransfer(FrontendNodeType.LEADER); + GlobalStateMgr.getCurrentState().waitForReady(); while (true) {