diff --git a/fe/fe-core/src/main/java/com/starrocks/analysis/RedirectStatus.java b/fe/fe-core/src/main/java/com/starrocks/analysis/RedirectStatus.java index f5226951880a1..9773ab2ba932e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/analysis/RedirectStatus.java +++ b/fe/fe-core/src/main/java/com/starrocks/analysis/RedirectStatus.java @@ -18,25 +18,21 @@ package com.starrocks.analysis; public class RedirectStatus { - private boolean isForwardToMaster; + private final boolean isForwardToLeader; private boolean needToWaitJournalSync; public RedirectStatus() { - isForwardToMaster = true; + isForwardToLeader = true; needToWaitJournalSync = true; } - public RedirectStatus(boolean isForwardToMaster, boolean needToWaitJournalSync) { - this.isForwardToMaster = isForwardToMaster; + public RedirectStatus(boolean isForwardToLeader, boolean needToWaitJournalSync) { + this.isForwardToLeader = isForwardToLeader; this.needToWaitJournalSync = needToWaitJournalSync; } - public boolean isForwardToMaster() { - return isForwardToMaster; - } - - public void setForwardToMaster(boolean isForwardToMaster) { - this.isForwardToMaster = isForwardToMaster; + public boolean isForwardToLeader() { + return isForwardToLeader; } public boolean isNeedToWaitJournalSync() { diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java index 22e4bdca2473a..3911db80fdf28 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java @@ -155,7 +155,7 @@ public void tabletReport(long backendId, Map backendTablets, } // check and set path - // path info of replica is only saved in Master FE + // path info of replica is only saved in Leader FE if (backendTabletInfo.isSetPath_hash() && replica.getPathHash() != backendTabletInfo.getPath_hash()) { replica.setPathHash(backendTabletInfo.getPath_hash()); diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index a325d45d30a15..86fae79c00c34 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -250,7 +250,7 @@ public class Config extends ConfigBase { public static int edit_log_port = 9010; /** - * Master FE will save image every *edit_log_roll_num* meta journals. + * Leader FE will save image every *edit_log_roll_num* meta journals. */ @ConfField(mutable = true) public static int edit_log_roll_num = 50000; @@ -272,7 +272,7 @@ public class Config extends ConfigBase { public static int meta_delay_toleration_second = 300; // 5 min /** - * Master FE sync policy of bdbje. + * Leader FE sync policy of bdbje. * If you only deploy one Follower FE, set this to 'SYNC'. If you deploy more than 3 Follower FE, * you can set this and the following 'replica_sync_policy' to WRITE_NO_SYNC. * more info, see: http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.SyncPolicy.html @@ -316,8 +316,8 @@ public class Config extends ConfigBase { public static int bdbje_lock_timeout_second = 1; /** - * Set the maximum acceptable clock skew between non-master FE to Master FE host. - * This value is checked whenever a non-master FE establishes a connection to master FE via BDBJE. + * Set the maximum acceptable clock skew between non-leader FE to Leader FE host. + * This value is checked whenever a non-leader FE establishes a connection to leader FE via BDBJE. * The connection is abandoned if the clock skew is larger than this value. */ @ConfField @@ -399,7 +399,7 @@ public class Config extends ConfigBase { /** * If true, FE will reset bdbje replication group(that is, to remove all electable nodes info) - * and is supposed to start as Master. + * and is supposed to start as Leader. * If all the electable nodes can not start, we can copy the meta data * to another node and set this config to true to try to restart the FE. */ @@ -407,12 +407,12 @@ public class Config extends ConfigBase { public static String metadata_failure_recovery = "false"; /** - * If true, non-master FE will ignore the meta data delay gap between Master FE and its self, + * If true, non-leader FE will ignore the meta data delay gap between Leader FE and its self, * even if the metadata delay gap exceeds *meta_delay_toleration_second*. - * Non-master FE will still offer read service. + * Non-leader FE will still offer read service. *

- * This is helpful when you try to stop the Master FE for a relatively long time for some reason, - * but still wish the non-master FE can offer read service. + * This is helpful when you try to stop the Leader FE for a relatively long time for some reason, + * but still wish the non-leader FE can offer read service. */ @ConfField(mutable = true) public static boolean ignore_meta_check = false; diff --git a/fe/fe-core/src/main/java/com/starrocks/ha/BDBHA.java b/fe/fe-core/src/main/java/com/starrocks/ha/BDBHA.java index 78a18f8fd7a7d..ee256c61e755c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/ha/BDBHA.java +++ b/fe/fe-core/src/main/java/com/starrocks/ha/BDBHA.java @@ -183,16 +183,6 @@ public List getNoneLeaderNodes() { return ret; } - @Override - public void transferToMaster() { - - } - - @Override - public void transferToNonMaster() { - - } - @Override public boolean isLeader() { ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); diff --git a/fe/fe-core/src/main/java/com/starrocks/ha/HAProtocol.java b/fe/fe-core/src/main/java/com/starrocks/ha/HAProtocol.java index bcc767de5c355..1fbd2da28178f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/ha/HAProtocol.java +++ b/fe/fe-core/src/main/java/com/starrocks/ha/HAProtocol.java @@ -39,12 +39,6 @@ public interface HAProtocol { // get all the nodes except leader in the current group public List getNoneLeaderNodes(); - // transfer from nonMaster(unknown, follower or init) to master - public void transferToMaster(); - - // transfer to non-master - public void transferToNonMaster(); - // check if the current node is leader public boolean isLeader(); diff --git a/fe/fe-core/src/main/java/com/starrocks/http/HttpServer.java b/fe/fe-core/src/main/java/com/starrocks/http/HttpServer.java index 20be81d91078e..04b4a0c14f8de 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/HttpServer.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/HttpServer.java @@ -162,7 +162,7 @@ private void registerActions() throws IllegalArgException { QueryDumpAction.registerAction(controller); // meta service action - File imageDir = MetaHelper.getMasterImageDir(); + File imageDir = MetaHelper.getLeaderImageDir(); ImageAction.registerAction(controller, imageDir); InfoAction.registerAction(controller, imageDir); VersionAction.registerAction(controller, imageDir); diff --git a/fe/fe-core/src/main/java/com/starrocks/http/meta/ColocateMetaService.java b/fe/fe-core/src/main/java/com/starrocks/http/meta/ColocateMetaService.java index 9e58d6110333e..d4a10b7d533d4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/meta/ColocateMetaService.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/meta/ColocateMetaService.java @@ -96,11 +96,11 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) return; } checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - executeInMasterWithAdmin(request, response); + executeInLeaderWithAdmin(request, response); } // implement in derived classes - protected void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + protected void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { throw new DdlException("Not implemented"); } @@ -118,7 +118,7 @@ public static void registerAction(ActionController controller) throws IllegalArg } @Override - public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { response.setContentType("application/json"); RestResult result = new RestResult(); @@ -139,7 +139,7 @@ public static void registerAction(ActionController controller) throws IllegalArg } @Override - public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { GroupId groupId = checkAndGetGroupId(request); @@ -167,7 +167,7 @@ public static void registerAction(ActionController controller) throws IllegalArg } @Override - public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { GroupId groupId = checkAndGetGroupId(request); @@ -197,7 +197,7 @@ public static void registerAction(ActionController controller) throws IllegalArg } @Override - public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { GroupId groupId = checkAndGetGroupId(request); diff --git a/fe/fe-core/src/main/java/com/starrocks/http/meta/GlobalDictMetaService.java b/fe/fe-core/src/main/java/com/starrocks/http/meta/GlobalDictMetaService.java index ef14256b79d56..c587183277f97 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/meta/GlobalDictMetaService.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/meta/GlobalDictMetaService.java @@ -44,11 +44,11 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) return; } checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - executeInMasterWithAdmin(request, response); + executeInLeaderWithAdmin(request, response); } // implement in derived classes - protected void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + protected void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { throw new DdlException("Not implemented"); } @@ -65,7 +65,7 @@ public static void registerAction(ActionController controller) throws IllegalArg } @Override - public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response) + public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response) throws DdlException { HttpMethod method = request.getRequest().method(); if (method.equals(HttpMethod.POST)) { diff --git a/fe/fe-core/src/main/java/com/starrocks/http/rest/ShowProcAction.java b/fe/fe-core/src/main/java/com/starrocks/http/rest/ShowProcAction.java index a4a692423e1ae..da3bf8615b9db 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/rest/ShowProcAction.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/rest/ShowProcAction.java @@ -77,7 +77,7 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) t ConnectContext context = ConnectContext.get(); LeaderOpExecutor leaderOpExecutor = new LeaderOpExecutor(new OriginStatement(showProcStmt, 0), context, RedirectStatus.FORWARD_NO_SYNC); - LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId()); + LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId()); try { leaderOpExecutor.execute(); diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/Checkpoint.java b/fe/fe-core/src/main/java/com/starrocks/leader/Checkpoint.java index 3c63356a03ce8..5b280deeb5a24 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/Checkpoint.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/Checkpoint.java @@ -198,7 +198,7 @@ protected void runAfterCatalogReady() { try { cleaner.clean(); } catch (IOException e) { - LOG.error("Master delete old image file fail.", e); + LOG.error("Leader delete old image file fail.", e); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/MetaHelper.java b/fe/fe-core/src/main/java/com/starrocks/leader/MetaHelper.java index d79cffe04d9ce..4f6c5416aaa5a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/MetaHelper.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/MetaHelper.java @@ -39,7 +39,7 @@ public class MetaHelper { private static final int BUFFER_BYTES = 8 * 1024; private static final int CHECKPOINT_LIMIT_BYTES = 30 * 1024 * 1024; - public static File getMasterImageDir() { + public static File getLeaderImageDir() { String metaDir = GlobalStateMgr.getCurrentState().getImageDir(); return new File(metaDir); } diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java b/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java index 253025dc2c5d0..9fc245d82c35a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java @@ -65,7 +65,7 @@ import com.starrocks.task.ClearTransactionTask; import com.starrocks.task.CreateReplicaTask; import com.starrocks.task.DropReplicaTask; -import com.starrocks.task.MasterTask; +import com.starrocks.task.LeaderTask; import com.starrocks.task.PublishVersionTask; import com.starrocks.task.StorageMediaMigrationTask; import com.starrocks.task.UpdateTabletMetaInfoTask; @@ -262,7 +262,7 @@ private Map buildTabletMap(List tabletList) { return tabletMap; } - private class ReportTask extends MasterTask { + private class ReportTask extends LeaderTask { public long beId; public ReportType type; diff --git a/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java b/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java index 9ef997ec4bf75..778c51f79a853 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java @@ -28,8 +28,8 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.task.ExportExportingTask; import com.starrocks.task.ExportPendingTask; -import com.starrocks.task.MasterTask; -import com.starrocks.task.MasterTaskExecutor; +import com.starrocks.task.LeaderTask; +import com.starrocks.task.LeaderTaskExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,10 +42,10 @@ public final class ExportChecker extends LeaderDaemon { // checkers for running job state private static Map checkers = Maps.newHashMap(); // executors for pending tasks - private static Map executors = Maps.newHashMap(); + private static Map executors = Maps.newHashMap(); private JobState jobState; - private static MasterTaskExecutor exportingSubTaskExecutor; + private static LeaderTaskExecutor exportingSubTaskExecutor; private ExportChecker(JobState jobState, long intervalMs) { super("export checker " + jobState.name().toLowerCase(), intervalMs); @@ -57,14 +57,14 @@ public static void init(long intervalMs) { checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs)); int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true); + LeaderTaskExecutor pendingTaskExecutor = new LeaderTaskExecutor("export_pending_job", poolSize, true); executors.put(JobState.PENDING, pendingTaskExecutor); - MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true); + LeaderTaskExecutor exportingTaskExecutor = new LeaderTaskExecutor("export_exporting_job", poolSize, true); executors.put(JobState.EXPORTING, exportingTaskExecutor); // One export job will be split into multiple exporting sub tasks, the queue size is not determined, so set Integer.MAX_VALUE. - exportingSubTaskExecutor = new MasterTaskExecutor("export_exporting_sub_task", Config.export_task_pool_size, + exportingSubTaskExecutor = new LeaderTaskExecutor("export_exporting_sub_task", Config.export_task_pool_size, Integer.MAX_VALUE, true); } @@ -72,13 +72,13 @@ public static void startAll() { for (ExportChecker exportChecker : checkers.values()) { exportChecker.start(); } - for (MasterTaskExecutor masterTaskExecutor : executors.values()) { - masterTaskExecutor.start(); + for (LeaderTaskExecutor leaderTaskExecutor : executors.values()) { + leaderTaskExecutor.start(); } exportingSubTaskExecutor.start(); } - public static MasterTaskExecutor getExportingSubTaskExecutor() { + public static LeaderTaskExecutor getExportingSubTaskExecutor() { return exportingSubTaskExecutor; } @@ -123,7 +123,7 @@ private void runPendingJobs() { for (ExportJob job : pendingJobs) { try { - MasterTask task = new ExportPendingTask(job); + LeaderTask task = new ExportPendingTask(job); if (executors.get(JobState.PENDING).submit(task)) { LOG.info("run pending export job. job: {}", job); } @@ -138,7 +138,7 @@ private void runExportingJobs() { LOG.debug("exporting export job num: {}", jobs.size()); for (ExportJob job : jobs) { try { - MasterTask task = new ExportExportingTask(job); + LeaderTask task = new ExportExportingTask(job); if (executors.get(JobState.EXPORTING).submit(task)) { LOG.info("run exporting export job. job: {}", job); } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java index 41e4f94a45676..c4d13f89e2f94 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java @@ -63,7 +63,7 @@ import com.starrocks.qe.Coordinator; import com.starrocks.qe.QeProcessorImpl; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.task.MasterTaskExecutor; +import com.starrocks.task.LeaderTaskExecutor; import com.starrocks.thrift.TEtlState; import com.starrocks.thrift.TUniqueId; import com.starrocks.transaction.AbstractTxnStateChangeCallback; @@ -429,7 +429,7 @@ public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransact } } - protected void submitTask(MasterTaskExecutor executor, LoadTask task) throws LoadException { + protected void submitTask(LeaderTaskExecutor executor, LoadTask task) throws LoadException { int retryNum = 0; while (!executor.submit(task)) { LOG.warn("submit load task failed. try to resubmit. job id: {}, task id: {}, retry: {}", diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java index e2387528d215e..049e439711fe6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java @@ -27,11 +27,11 @@ import com.starrocks.common.util.LogKey; import com.starrocks.load.FailMsg; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.task.MasterTask; +import com.starrocks.task.LeaderTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public abstract class LoadTask extends MasterTask { +public abstract class LoadTask extends LeaderTask { public enum TaskType { PENDING, LOADING diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java index 3fce90f57fcf4..96f0eb749607d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java @@ -428,7 +428,7 @@ private ByteBuffer getResultPacket() { // use to return result packet to user private void finalizeCommand() throws IOException { ByteBuffer packet = null; - if (executor != null && executor.isForwardToMaster()) { + if (executor != null && executor.isForwardToLeader()) { // for ERR State, set packet to remote packet(executor.getOutputPacket()) // because remote packet has error detail // but for not ERR (OK or EOF) State, we should check whether stmt is ShowStmt, @@ -559,7 +559,7 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { // return error directly. TMasterOpResult result = new TMasterOpResult(); ctx.getState().setError( - "Missing current user identity. You need to upgrade this Frontend to the same version as Master Frontend."); + "Missing current user identity. You need to upgrade this Frontend to the same version as Leader Frontend."); result.setMaxJournalId(GlobalStateMgr.getCurrentState().getMaxJournalId()); result.setPacket(getResultPacket()); return result; diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java index 2b953069392db..e1981e2700810 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java @@ -131,7 +131,7 @@ private void forward() throws Exception { params.setQuery_options(queryOptions); params.setQueryId(UUIDUtil.toTUniqueId(ctx.getQueryId())); - LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress); + LOG.info("Forward statement {} to Leader {}", ctx.getStmtId(), thriftAddress); result = FrontendServiceProxy.call(thriftAddress, thriftTimeoutMs, diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index a4164640f1724..fdb3e94982eff 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -122,7 +122,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; - // if set to true, some of stmt will be forwarded to master FE to get result + // if set to true, some of stmt will be forwarded to leader FE to get result public static final String FORWARD_TO_MASTER = "forward_to_master"; // user can set instance num after exchange, no need to be equal to nums of before exchange public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num"; diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 04e3d280464b7..963d15bfe006e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -235,7 +235,7 @@ public void initProfile(long beginTimeInNanoSecond) { } } - public boolean isForwardToMaster() { + public boolean isForwardToLeader() { if (GlobalStateMgr.getCurrentState().isLeader()) { return false; } @@ -250,7 +250,7 @@ public boolean isForwardToMaster() { if (redirectStatus == null) { return false; } else { - return redirectStatus.isForwardToMaster(); + return redirectStatus.isForwardToLeader(); } } @@ -329,7 +329,7 @@ public void execute() throws Exception { StatementPlanner.supportedByNewPlanner(parsedStmt)) { try (PlannerProfile.ScopedTimer _ = PlannerProfile.getScopedTimer("Total")) { redirectStatus = parsedStmt.getRedirectStatus(); - if (!isForwardToMaster()) { + if (!isForwardToLeader()) { context.getDumpInfo().reset(); context.getDumpInfo().setOriginStmt(parsedStmt.getOrigStmt().originStmt); if (parsedStmt instanceof ShowStmt) { @@ -369,11 +369,11 @@ public void execute() throws Exception { if (context.isQueryDump()) { return; } - if (isForwardToMaster()) { - forwardToMaster(); + if (isForwardToLeader()) { + forwardToLeader(); return; } else { - LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId()); + LOG.debug("no need to transfer to Leader. stmt: {}", context.getStmtId()); } // Only add the last running stmt for multi statement, @@ -573,9 +573,9 @@ private void dumpException(Exception e) { } } - private void forwardToMaster() throws Exception { + private void forwardToLeader() throws Exception { leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus); - LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId()); + LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId()); leaderOpExecutor.execute(); } @@ -605,7 +605,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException { // yiguolei: insertstmt's grammar analysis will write editlog, so that we check if the stmt should be forward to master here // if the stmt should be forward to master, then just return here and the master will do analysis again - if (isForwardToMaster()) { + if (isForwardToLeader()) { return; } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 5027dc9060382..8acc7dcdee158 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -225,7 +225,7 @@ import com.starrocks.system.HeartbeatMgr; import com.starrocks.system.SystemInfoService; import com.starrocks.task.AgentBatchTask; -import com.starrocks.task.MasterTaskExecutor; +import com.starrocks.task.LeaderTaskExecutor; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TRefreshTableRequest; import com.starrocks.thrift.TRefreshTableResponse; @@ -363,8 +363,8 @@ public class GlobalStateMgr { private TabletChecker tabletChecker; // Thread pools for pending and loading task, separately - private MasterTaskExecutor pendingLoadTaskScheduler; - private MasterTaskExecutor loadingLoadTaskScheduler; + private LeaderTaskExecutor pendingLoadTaskScheduler; + private LeaderTaskExecutor loadingLoadTaskScheduler; private LoadJobScheduler loadJobScheduler; @@ -536,11 +536,11 @@ private GlobalStateMgr(boolean isCheckpointCatalog) { this.tabletChecker = new TabletChecker(this, nodeMgr.getClusterInfo(), tabletScheduler, stat); this.pendingLoadTaskScheduler = - new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size, + new LeaderTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size, Config.desired_max_waiting_jobs, !isCheckpointCatalog); // One load job will be split into multiple loading tasks, the queue size is not determined, so set Integer.MAX_VALUE. this.loadingLoadTaskScheduler = - new MasterTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size, + new LeaderTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size, Integer.MAX_VALUE, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); @@ -925,7 +925,7 @@ private void transferToLeader() { journalWriter.startDaemon(); - // Set the feType to MASTER before writing edit log, because the feType must be Master when writing edit log. + // Set the feType to LEADER before writing edit log, because the feType must be Leader when writing edit log. // It will be set to the old type if any error happens in the following procedure feType = FrontendNodeType.LEADER; try { @@ -957,9 +957,9 @@ private void transferToLeader() { nodeMgr.setLeaderInfo(); // start all daemon threads that only running on MASTER FE - startMasterOnlyDaemonThreads(); + startLeaderOnlyDaemonThreads(); // start other daemon threads that should running on all FE - startNonMasterDaemonThreads(); + startNonLeaderDaemonThreads(); insertOverwriteJobManager.cancelRunningJobs(); MetricRepo.init(); @@ -980,7 +980,7 @@ private void transferToLeader() { } // start all daemon threads only running on Master - private void startMasterOnlyDaemonThreads() { + private void startLeaderOnlyDaemonThreads() { if (Config.integrate_starmgr) { // register service to starMgr getStarOSAgent().registerAndBootstrapService(); @@ -1044,7 +1044,7 @@ private void startMasterOnlyDaemonThreads() { } // start threads that should running on all FE - private void startNonMasterDaemonThreads() { + private void startNonLeaderDaemonThreads() { tabletStatMgr.start(); // load and export job label cleaner thread labelCleaner.start(); @@ -1092,7 +1092,7 @@ private void transferToNonLeader(FrontendNodeType newType) { replayer.start(); } - startNonMasterDaemonThreads(); + startNonLeaderDaemonThreads(); MetricRepo.init(); @@ -2363,11 +2363,11 @@ public LoadManager getLoadManager() { return loadManager; } - public MasterTaskExecutor getPendingLoadTaskScheduler() { + public LeaderTaskExecutor getPendingLoadTaskScheduler() { return pendingLoadTaskScheduler; } - public MasterTaskExecutor getLoadingLoadTaskScheduler() { + public LeaderTaskExecutor getLoadingLoadTaskScheduler() { return loadingLoadTaskScheduler; } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java index 8cd331f55959b..76d81edd2de0f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java @@ -126,7 +126,7 @@ public void initialize(String[] args) throws Exception { } public void startHearbeat(long epoch) { - heartbeatMgr.setMaster(clusterId, token, epoch); + heartbeatMgr.setLeader(clusterId, token, epoch); heartbeatMgr.start(); } @@ -547,7 +547,7 @@ private void getHelperNodes(String[] args) throws AnalysisException { */ throw new AnalysisException( "Do not specify the helper node to FE itself. " - + "Please specify it to the existing running Master or Follower FE"); + + "Please specify it to the existing running Leader or Follower FE"); } helperNodes.add(helperHostPort); } diff --git a/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java index f7579bb3cc69e..55a4894f29f29 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatMgr.java @@ -80,7 +80,7 @@ public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { this.heartbeatFlags = new HeartbeatFlags(); } - public void setMaster(int clusterId, String token, long epoch) { + public void setLeader(int clusterId, String token, long epoch) { TMasterInfo tMasterInfo = new TMasterInfo( new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port), clusterId, epoch); tMasterInfo.setToken(token); diff --git a/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatResponse.java b/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatResponse.java index a8d4badbfb644..028e4e094c115 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatResponse.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/HeartbeatResponse.java @@ -53,7 +53,7 @@ public enum HbStatus { /** * msg and hbTime are no need to be synchronized to other Frontends, - * and only Master Frontend has these info + * and only Leader Frontend has these info */ protected String msg; protected long hbTime; diff --git a/fe/fe-core/src/main/java/com/starrocks/task/ExportExportingTask.java b/fe/fe-core/src/main/java/com/starrocks/task/ExportExportingTask.java index 24f4bafb252b2..01439e5e5889f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/ExportExportingTask.java @@ -48,7 +48,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; -public class ExportExportingTask extends MasterTask { +public class ExportExportingTask extends LeaderTask { private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class); private static final int RETRY_NUM = 2; @@ -90,7 +90,7 @@ protected void exec() { if (job.isReplayed()) { // If the job is created from replay thread, all plan info will be lost. // so the job has to be cancelled. - String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled"; + String failMsg = "FE restarted or Leader changed during exporting. Job must be cancelled"; job.cancelInternal(ExportFailMsg.CancelType.RUN_FAIL, failMsg); return; } @@ -252,7 +252,7 @@ private Status moveTmpFiles() { return Status.OK; } - private class ExportExportingSubTask extends MasterTask { + private class ExportExportingSubTask extends LeaderTask { private final Coordinator coord; private final int taskIdx; private final int coordSize; diff --git a/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java b/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java index 72d1ecc1f688b..6fe784dcb71fc 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/ExportPendingTask.java @@ -42,7 +42,7 @@ import java.util.List; -public class ExportPendingTask extends MasterTask { +public class ExportPendingTask extends LeaderTask { private static final Logger LOG = LogManager.getLogger(ExportPendingTask.class); protected final ExportJob job; @@ -70,7 +70,7 @@ protected void exec() { if (job.isReplayed()) { // If the job is created from replay thread, all plan info will be lost. // so the job has to be cancelled. - String failMsg = "FE restarted or Master changed during exporting. Job must be cancalled."; + String failMsg = "FE restarted or Leader changed during exporting. Job must be cancalled."; job.cancelInternal(ExportFailMsg.CancelType.RUN_FAIL, failMsg); return; } diff --git a/fe/fe-core/src/main/java/com/starrocks/task/MasterTask.java b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTask.java similarity index 91% rename from fe/fe-core/src/main/java/com/starrocks/task/MasterTask.java rename to fe/fe-core/src/main/java/com/starrocks/task/LeaderTask.java index 91b4694e0844f..f9544137e4a9e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/MasterTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTask.java @@ -20,8 +20,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public abstract class MasterTask implements Runnable { - private static final Logger LOG = LogManager.getLogger(MasterTask.class); +public abstract class LeaderTask implements Runnable { + private static final Logger LOG = LogManager.getLogger(LeaderTask.class); protected long signature; diff --git a/fe/fe-core/src/main/java/com/starrocks/task/MasterTaskExecutor.java b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java similarity index 94% rename from fe/fe-core/src/main/java/com/starrocks/task/MasterTaskExecutor.java rename to fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java index 510ab04be963b..cc187d0a1fa52 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/MasterTaskExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java @@ -35,14 +35,14 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class MasterTaskExecutor { - private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class); +public class LeaderTaskExecutor { + private static final Logger LOG = LogManager.getLogger(LeaderTaskExecutor.class); private ThreadPoolExecutor executor; private Map> runningTasks; public ScheduledThreadPoolExecutor scheduledThreadPool; - public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric) { + public LeaderTaskExecutor(String name, int threadNum, boolean needRegisterMetric) { executor = ThreadPoolManager .newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool", needRegisterMetric); runningTasks = Maps.newHashMap(); @@ -50,7 +50,7 @@ public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } - public MasterTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) { + public LeaderTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) { executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, queueSize, name + "_pool", needRegisterMetric); runningTasks = Maps.newHashMap(); scheduledThreadPool = @@ -71,7 +71,7 @@ public void start() { * @return true if submit success * false if task exists */ - public boolean submit(MasterTask task) { + public boolean submit(LeaderTask task) { long signature = task.getSignature(); synchronized (runningTasks) { if (runningTasks.containsKey(signature)) { diff --git a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java index 398bb4ee3f417..aad47074238ad 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java @@ -42,8 +42,8 @@ import com.starrocks.load.EtlStatus; import com.starrocks.metric.MetricRepo; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.task.MasterTask; -import com.starrocks.task.MasterTaskExecutor; +import com.starrocks.task.LeaderTask; +import com.starrocks.task.LeaderTaskExecutor; import com.starrocks.transaction.TransactionState; import mockit.Expectations; import mockit.Injectable; @@ -211,10 +211,10 @@ public void testGetTableNames(@Injectable BrokerFileGroupAggInfo fileGroupAggInf } @Test - public void testExecuteJob(@Mocked MasterTaskExecutor masterTaskExecutor) throws LoadException { + public void testExecuteJob(@Mocked LeaderTaskExecutor leaderTaskExecutor) throws LoadException { new Expectations() { { - masterTaskExecutor.submit((MasterTask) any); + leaderTaskExecutor.submit((LeaderTask) any); minTimes = 0; result = true; } @@ -266,7 +266,7 @@ public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment at @Injectable BrokerFileGroup brokerFileGroup1, @Injectable BrokerFileGroup brokerFileGroup2, @Injectable BrokerFileGroup brokerFileGroup3, - @Mocked MasterTaskExecutor masterTaskExecutor, + @Mocked LeaderTaskExecutor leaderTaskExecutor, @Injectable OlapTable olapTable, @Mocked LoadingTaskPlanner loadingTaskPlanner) { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(); @@ -309,7 +309,7 @@ public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment at result = 1L; result = 2L; result = 3L; - masterTaskExecutor.submit((MasterTask) any); + leaderTaskExecutor.submit((LeaderTask) any); minTimes = 0; result = true; } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadJobTest.java index cf9852ddf2930..a9fa5a2413aad 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadJobTest.java @@ -36,8 +36,8 @@ import com.starrocks.metric.MetricRepo; import com.starrocks.persist.EditLog; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.task.MasterTask; -import com.starrocks.task.MasterTaskExecutor; +import com.starrocks.task.LeaderTask; +import com.starrocks.task.LeaderTaskExecutor; import com.starrocks.thrift.TUniqueId; import com.starrocks.transaction.BeginTransactionException; import com.starrocks.transaction.GlobalTransactionMgr; @@ -111,7 +111,7 @@ public void testSetJobProperties() { @Test public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr, - @Mocked MasterTaskExecutor masterTaskExecutor) + @Mocked LeaderTaskExecutor leaderTaskExecutor) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { LoadJob loadJob = new BrokerLoadJob(); new Expectations() { @@ -121,7 +121,7 @@ public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr, (TransactionState.LoadJobSourceType) any, anyLong, anyLong); minTimes = 0; result = 1; - masterTaskExecutor.submit((MasterTask) any); + leaderTaskExecutor.submit((LeaderTask) any); minTimes = 0; result = true; } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/SparkLoadJobTest.java index 9031764d6fae2..643cc08aa5f62 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/SparkLoadJobTest.java @@ -56,7 +56,7 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.task.AgentBatchTask; import com.starrocks.task.AgentTaskExecutor; -import com.starrocks.task.MasterTaskExecutor; +import com.starrocks.task.LeaderTaskExecutor; import com.starrocks.task.PushTask; import com.starrocks.thrift.TEtlState; import com.starrocks.transaction.GlobalTransactionMgr; @@ -199,7 +199,7 @@ public void testCreateFromLoadStmt(@Mocked GlobalStateMgr globalStateMgr, @Injec @Test public void testExecute(@Mocked GlobalStateMgr globalStateMgr, @Mocked SparkLoadPendingTask pendingTask, @Injectable String originStmt, @Injectable GlobalTransactionMgr transactionMgr, - @Injectable MasterTaskExecutor executor) throws Exception { + @Injectable LeaderTaskExecutor executor) throws Exception { new Expectations() { { GlobalStateMgr.getCurrentGlobalTransactionMgr(); diff --git a/fe/fe-core/src/test/java/com/starrocks/task/MasterTaskExecutorTest.java b/fe/fe-core/src/test/java/com/starrocks/task/LeaderTaskExecutorTest.java similarity index 84% rename from fe/fe-core/src/test/java/com/starrocks/task/MasterTaskExecutorTest.java rename to fe/fe-core/src/test/java/com/starrocks/task/LeaderTaskExecutorTest.java index 177cde13f51a6..6835673b566a2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/task/MasterTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/task/LeaderTaskExecutorTest.java @@ -24,16 +24,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MasterTaskExecutorTest { - private static final Logger LOG = LoggerFactory.getLogger(MasterTaskExecutorTest.class); +public class LeaderTaskExecutorTest { + private static final Logger LOG = LoggerFactory.getLogger(LeaderTaskExecutorTest.class); private static final int THREAD_NUM = 1; private static final long SLEEP_MS = 10L; - private MasterTaskExecutor executor; + private LeaderTaskExecutor executor; @Before public void setUp() { - executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM, false); + executor = new LeaderTaskExecutor("master_task_executor_test", THREAD_NUM, false); executor.start(); } @@ -47,7 +47,7 @@ public void tearDown() { @Test public void testSubmit() { // submit task - MasterTask task1 = new TestMasterTask(1L); + LeaderTask task1 = new TestLeaderTask(1L); Assert.assertTrue(executor.submit(task1)); Assert.assertEquals(1, executor.getTaskNum()); // submit same running task error @@ -55,7 +55,7 @@ public void testSubmit() { Assert.assertEquals(1, executor.getTaskNum()); // submit another task - MasterTask task2 = new TestMasterTask(2L); + LeaderTask task2 = new TestLeaderTask(2L); Assert.assertTrue(executor.submit(task2)); Assert.assertEquals(2, executor.getTaskNum()); @@ -70,9 +70,9 @@ public void testSubmit() { } } - private class TestMasterTask extends MasterTask { + private class TestLeaderTask extends LeaderTask { - public TestMasterTask(long signature) { + public TestLeaderTask(long signature) { this.signature = signature; } diff --git a/fe/fe-core/src/test/java/com/starrocks/utframe/MockJournal.java b/fe/fe-core/src/test/java/com/starrocks/utframe/MockJournal.java index 0968e108583e9..0467c7518ab7e 100644 --- a/fe/fe-core/src/test/java/com/starrocks/utframe/MockJournal.java +++ b/fe/fe-core/src/test/java/com/starrocks/utframe/MockJournal.java @@ -173,14 +173,6 @@ public List getNoneLeaderNodes() { return Lists.newArrayList(); } - @Override - public void transferToMaster() { - } - - @Override - public void transferToNonMaster() { - } - @Override public boolean isLeader() { return true;