Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] [Step 1/2] Change the name from master to leader #8953

Merged
merged 6 commits into from Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,25 +18,21 @@
package com.starrocks.analysis;

public class RedirectStatus {
private boolean isForwardToMaster;
private final boolean isForwardToLeader;
private boolean needToWaitJournalSync;

public RedirectStatus() {
isForwardToMaster = true;
isForwardToLeader = true;
needToWaitJournalSync = true;
}

public RedirectStatus(boolean isForwardToMaster, boolean needToWaitJournalSync) {
this.isForwardToMaster = isForwardToMaster;
public RedirectStatus(boolean isForwardToLeader, boolean needToWaitJournalSync) {
this.isForwardToLeader = isForwardToLeader;
this.needToWaitJournalSync = needToWaitJournalSync;
}

public boolean isForwardToMaster() {
return isForwardToMaster;
}

public void setForwardToMaster(boolean isForwardToMaster) {
this.isForwardToMaster = isForwardToMaster;
public boolean isForwardToLeader() {
return isForwardToLeader;
}

public boolean isNeedToWaitJournalSync() {
Expand Down
Expand Up @@ -155,7 +155,7 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
}

// check and set path
// path info of replica is only saved in Master FE
// path info of replica is only saved in Leader FE
if (backendTabletInfo.isSetPath_hash() &&
replica.getPathHash() != backendTabletInfo.getPath_hash()) {
replica.setPathHash(backendTabletInfo.getPath_hash());
Expand Down
18 changes: 9 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Expand Up @@ -250,7 +250,7 @@ public class Config extends ConfigBase {
public static int edit_log_port = 9010;

/**
* Master FE will save image every *edit_log_roll_num* meta journals.
* Leader FE will save image every *edit_log_roll_num* meta journals.
*/
@ConfField(mutable = true)
public static int edit_log_roll_num = 50000;
Expand All @@ -272,7 +272,7 @@ public class Config extends ConfigBase {
public static int meta_delay_toleration_second = 300; // 5 min

/**
* Master FE sync policy of bdbje.
* Leader FE sync policy of bdbje.
* If you only deploy one Follower FE, set this to 'SYNC'. If you deploy more than 3 Follower FE,
* you can set this and the following 'replica_sync_policy' to WRITE_NO_SYNC.
* more info, see: http://docs.oracle.com/cd/E17277_02/html/java/com/sleepycat/je/Durability.SyncPolicy.html
Expand Down Expand Up @@ -316,8 +316,8 @@ public class Config extends ConfigBase {
public static int bdbje_lock_timeout_second = 1;

/**
* Set the maximum acceptable clock skew between non-master FE to Master FE host.
* This value is checked whenever a non-master FE establishes a connection to master FE via BDBJE.
* Set the maximum acceptable clock skew between non-leader FE to Leader FE host.
* This value is checked whenever a non-leader FE establishes a connection to leader FE via BDBJE.
* The connection is abandoned if the clock skew is larger than this value.
*/
@ConfField
Expand Down Expand Up @@ -399,20 +399,20 @@ public class Config extends ConfigBase {

/**
* If true, FE will reset bdbje replication group(that is, to remove all electable nodes info)
* and is supposed to start as Master.
* and is supposed to start as Leader.
* If all the electable nodes can not start, we can copy the meta data
* to another node and set this config to true to try to restart the FE.
*/
@ConfField
public static String metadata_failure_recovery = "false";

/**
* If true, non-master FE will ignore the meta data delay gap between Master FE and its self,
* If true, non-leader FE will ignore the meta data delay gap between Leader FE and its self,
* even if the metadata delay gap exceeds *meta_delay_toleration_second*.
* Non-master FE will still offer read service.
* Non-leader FE will still offer read service.
* <p>
* This is helpful when you try to stop the Master FE for a relatively long time for some reason,
* but still wish the non-master FE can offer read service.
* This is helpful when you try to stop the Leader FE for a relatively long time for some reason,
* but still wish the non-leader FE can offer read service.
*/
@ConfField(mutable = true)
public static boolean ignore_meta_check = false;
Expand Down
10 changes: 0 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/ha/BDBHA.java
Expand Up @@ -183,16 +183,6 @@ public List<InetSocketAddress> getNoneLeaderNodes() {
return ret;
}

@Override
public void transferToMaster() {

}

@Override
public void transferToNonMaster() {

}

@Override
public boolean isLeader() {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
Expand Down
6 changes: 0 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/ha/HAProtocol.java
Expand Up @@ -39,12 +39,6 @@ public interface HAProtocol {
// get all the nodes except leader in the current group
public List<InetSocketAddress> getNoneLeaderNodes();

// transfer from nonMaster(unknown, follower or init) to master
public void transferToMaster();

// transfer to non-master
public void transferToNonMaster();

// check if the current node is leader
public boolean isLeader();

Expand Down
Expand Up @@ -162,7 +162,7 @@ private void registerActions() throws IllegalArgException {
QueryDumpAction.registerAction(controller);

// meta service action
File imageDir = MetaHelper.getMasterImageDir();
File imageDir = MetaHelper.getLeaderImageDir();
ImageAction.registerAction(controller, imageDir);
InfoAction.registerAction(controller, imageDir);
VersionAction.registerAction(controller, imageDir);
Expand Down
Expand Up @@ -96,11 +96,11 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response)
return;
}
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
executeInMasterWithAdmin(request, response);
executeInLeaderWithAdmin(request, response);
}

// implement in derived classes
protected void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
protected void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
throw new DdlException("Not implemented");
}
Expand All @@ -118,7 +118,7 @@ public static void registerAction(ActionController controller) throws IllegalArg
}

@Override
public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
response.setContentType("application/json");
RestResult result = new RestResult();
Expand All @@ -139,7 +139,7 @@ public static void registerAction(ActionController controller) throws IllegalArg
}

@Override
public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
GroupId groupId = checkAndGetGroupId(request);

Expand Down Expand Up @@ -167,7 +167,7 @@ public static void registerAction(ActionController controller) throws IllegalArg
}

@Override
public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
GroupId groupId = checkAndGetGroupId(request);

Expand Down Expand Up @@ -197,7 +197,7 @@ public static void registerAction(ActionController controller) throws IllegalArg
}

@Override
public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
GroupId groupId = checkAndGetGroupId(request);

Expand Down
Expand Up @@ -44,11 +44,11 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response)
return;
}
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
executeInMasterWithAdmin(request, response);
executeInLeaderWithAdmin(request, response);
}

// implement in derived classes
protected void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
protected void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
throw new DdlException("Not implemented");
}
Expand All @@ -65,7 +65,7 @@ public static void registerAction(ActionController controller) throws IllegalArg
}

@Override
public void executeInMasterWithAdmin(BaseRequest request, BaseResponse response)
public void executeInLeaderWithAdmin(BaseRequest request, BaseResponse response)
throws DdlException {
HttpMethod method = request.getRequest().method();
if (method.equals(HttpMethod.POST)) {
Expand Down
Expand Up @@ -77,7 +77,7 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) t
ConnectContext context = ConnectContext.get();
LeaderOpExecutor leaderOpExecutor = new LeaderOpExecutor(new OriginStatement(showProcStmt, 0), context,
RedirectStatus.FORWARD_NO_SYNC);
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());

try {
leaderOpExecutor.execute();
Expand Down
Expand Up @@ -198,7 +198,7 @@ protected void runAfterCatalogReady() {
try {
cleaner.clean();
} catch (IOException e) {
LOG.error("Master delete old image file fail.", e);
LOG.error("Leader delete old image file fail.", e);
}

}
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class MetaHelper {
private static final int BUFFER_BYTES = 8 * 1024;
private static final int CHECKPOINT_LIMIT_BYTES = 30 * 1024 * 1024;

public static File getMasterImageDir() {
public static File getLeaderImageDir() {
String metaDir = GlobalStateMgr.getCurrentState().getImageDir();
return new File(metaDir);
}
Expand Down
Expand Up @@ -65,7 +65,7 @@
import com.starrocks.task.ClearTransactionTask;
import com.starrocks.task.CreateReplicaTask;
import com.starrocks.task.DropReplicaTask;
import com.starrocks.task.MasterTask;
import com.starrocks.task.LeaderTask;
import com.starrocks.task.PublishVersionTask;
import com.starrocks.task.StorageMediaMigrationTask;
import com.starrocks.task.UpdateTabletMetaInfoTask;
Expand Down Expand Up @@ -262,7 +262,7 @@ private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
return tabletMap;
}

private class ReportTask extends MasterTask {
private class ReportTask extends LeaderTask {

public long beId;
public ReportType type;
Expand Down
24 changes: 12 additions & 12 deletions fe/fe-core/src/main/java/com/starrocks/load/ExportChecker.java
Expand Up @@ -28,8 +28,8 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.task.ExportExportingTask;
import com.starrocks.task.ExportPendingTask;
import com.starrocks.task.MasterTask;
import com.starrocks.task.MasterTaskExecutor;
import com.starrocks.task.LeaderTask;
import com.starrocks.task.LeaderTaskExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -42,10 +42,10 @@ public final class ExportChecker extends LeaderDaemon {
// checkers for running job state
private static Map<JobState, ExportChecker> checkers = Maps.newHashMap();
// executors for pending tasks
private static Map<JobState, MasterTaskExecutor> executors = Maps.newHashMap();
private static Map<JobState, LeaderTaskExecutor> executors = Maps.newHashMap();
private JobState jobState;

private static MasterTaskExecutor exportingSubTaskExecutor;
private static LeaderTaskExecutor exportingSubTaskExecutor;

private ExportChecker(JobState jobState, long intervalMs) {
super("export checker " + jobState.name().toLowerCase(), intervalMs);
Expand All @@ -57,28 +57,28 @@ public static void init(long intervalMs) {
checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs));

int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit;
MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true);
LeaderTaskExecutor pendingTaskExecutor = new LeaderTaskExecutor("export_pending_job", poolSize, true);
executors.put(JobState.PENDING, pendingTaskExecutor);

MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true);
LeaderTaskExecutor exportingTaskExecutor = new LeaderTaskExecutor("export_exporting_job", poolSize, true);
executors.put(JobState.EXPORTING, exportingTaskExecutor);

// One export job will be split into multiple exporting sub tasks, the queue size is not determined, so set Integer.MAX_VALUE.
exportingSubTaskExecutor = new MasterTaskExecutor("export_exporting_sub_task", Config.export_task_pool_size,
exportingSubTaskExecutor = new LeaderTaskExecutor("export_exporting_sub_task", Config.export_task_pool_size,
Integer.MAX_VALUE, true);
}

public static void startAll() {
for (ExportChecker exportChecker : checkers.values()) {
exportChecker.start();
}
for (MasterTaskExecutor masterTaskExecutor : executors.values()) {
masterTaskExecutor.start();
for (LeaderTaskExecutor leaderTaskExecutor : executors.values()) {
leaderTaskExecutor.start();
}
exportingSubTaskExecutor.start();
}

public static MasterTaskExecutor getExportingSubTaskExecutor() {
public static LeaderTaskExecutor getExportingSubTaskExecutor() {
return exportingSubTaskExecutor;
}

Expand Down Expand Up @@ -123,7 +123,7 @@ private void runPendingJobs() {

for (ExportJob job : pendingJobs) {
try {
MasterTask task = new ExportPendingTask(job);
LeaderTask task = new ExportPendingTask(job);
if (executors.get(JobState.PENDING).submit(task)) {
LOG.info("run pending export job. job: {}", job);
}
Expand All @@ -138,7 +138,7 @@ private void runExportingJobs() {
LOG.debug("exporting export job num: {}", jobs.size());
for (ExportJob job : jobs) {
try {
MasterTask task = new ExportExportingTask(job);
LeaderTask task = new ExportExportingTask(job);
if (executors.get(JobState.EXPORTING).submit(task)) {
LOG.info("run exporting export job. job: {}", job);
}
Expand Down
Expand Up @@ -63,7 +63,7 @@
import com.starrocks.qe.Coordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.task.MasterTaskExecutor;
import com.starrocks.task.LeaderTaskExecutor;
import com.starrocks.thrift.TEtlState;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.AbstractTxnStateChangeCallback;
Expand Down Expand Up @@ -429,7 +429,7 @@ public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransact
}
}

protected void submitTask(MasterTaskExecutor executor, LoadTask task) throws LoadException {
protected void submitTask(LeaderTaskExecutor executor, LoadTask task) throws LoadException {
int retryNum = 0;
while (!executor.submit(task)) {
LOG.warn("submit load task failed. try to resubmit. job id: {}, task id: {}, retry: {}",
Expand Down
Expand Up @@ -27,11 +27,11 @@
import com.starrocks.common.util.LogKey;
import com.starrocks.load.FailMsg;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.task.MasterTask;
import com.starrocks.task.LeaderTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class LoadTask extends MasterTask {
public abstract class LoadTask extends LeaderTask {
public enum TaskType {
PENDING,
LOADING
Expand Down
Expand Up @@ -428,7 +428,7 @@ private ByteBuffer getResultPacket() {
// use to return result packet to user
private void finalizeCommand() throws IOException {
ByteBuffer packet = null;
if (executor != null && executor.isForwardToMaster()) {
if (executor != null && executor.isForwardToLeader()) {
// for ERR State, set packet to remote packet(executor.getOutputPacket())
// because remote packet has error detail
// but for not ERR (OK or EOF) State, we should check whether stmt is ShowStmt,
Expand Down Expand Up @@ -559,7 +559,7 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) {
// return error directly.
TMasterOpResult result = new TMasterOpResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about TMasterOpRequest..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the thrift class will cause the incompatibility problem. So we leave these kind of class unchanged.

ctx.getState().setError(
"Missing current user identity. You need to upgrade this Frontend to the same version as Master Frontend.");
"Missing current user identity. You need to upgrade this Frontend to the same version as Leader Frontend.");
result.setMaxJournalId(GlobalStateMgr.getCurrentState().getMaxJournalId());
result.setPacket(getResultPacket());
return result;
Expand Down
Expand Up @@ -131,7 +131,7 @@ private void forward() throws Exception {
params.setQuery_options(queryOptions);

params.setQueryId(UUIDUtil.toTUniqueId(ctx.getQueryId()));
LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress);
LOG.info("Forward statement {} to Leader {}", ctx.getStmtId(), thriftAddress);

result = FrontendServiceProxy.call(thriftAddress,
thriftTimeoutMs,
Expand Down
Expand Up @@ -122,7 +122,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
// if set to true, some of stmt will be forwarded to master FE to get result
// if set to true, some of stmt will be forwarded to leader FE to get result
public static final String FORWARD_TO_MASTER = "forward_to_master";
Astralidea marked this conversation as resolved.
Show resolved Hide resolved
// user can set instance num after exchange, no need to be equal to nums of before exchange
public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num";
Expand Down