Skip to content

Commit

Permalink
[Summer of Code] Support switch role for broker (#4272)
Browse files Browse the repository at this point in the history
* feature:
1.add replicasManager and controllerProxy

* feature:
1.add brokerHaAddress in controller.

* feature:
1.add replicasManager

* feature:
1.add brokerController to replicasManager.
2.change brokerController when change role.

* feature: add api message empty constructor

* feature: move set from header to remotingRequest body

* feature: modify autoSwitchHaClient's rpc protocol, add slaveId, slaveAddress.

* feature: review code

* feature: review code

* add some debug info

* feature: let ha service get masterHaAdress after register to name-srv

* feature: review code xxxx

* feature: let controller return err remark

* style: review

* style: review code

* feature: add more integrationTest

* style: review code

* style: review code

* fix: port already bind
  • Loading branch information
hzh0425 authored May 16, 2022
1 parent ce534ee commit ad8851b
Show file tree
Hide file tree
Showing 34 changed files with 1,461 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.io.FilenameUtils;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand All @@ -56,6 +55,7 @@
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.hacontroller.ReplicasManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
Expand Down Expand Up @@ -114,6 +114,7 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand All @@ -139,7 +140,6 @@
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
Expand Down Expand Up @@ -252,6 +252,7 @@ public class BrokerController {
protected volatile String minBrokerAddrInGroup = null;
private final Lock lock = new ReentrantLock();
protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
protected ReplicasManager replicasManager;

public BrokerController(
final BrokerConfig brokerConfig,
Expand Down Expand Up @@ -602,7 +603,7 @@ public void run() {
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable()) {
if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !messageStoreConfig.isStartupControllerMode()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
Expand Down Expand Up @@ -637,6 +638,10 @@ public void run() {
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}

if (this.messageStoreConfig.isStartupControllerMode()) {
this.updateMasterHAServerAddrPeriodically = true;
}
}

protected void initializeScheduledTasks() {
Expand Down Expand Up @@ -706,7 +711,9 @@ public boolean initialize() throws CloneNotSupportedException {
MessageStorePluginContext context = new MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager, messageArrivingListener);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));

if (this.messageStoreConfig.isStartupControllerMode()) {
this.replicasManager = new ReplicasManager(this);
}
} catch (IOException e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
Expand Down Expand Up @@ -1355,6 +1362,10 @@ protected void startBasicService() throws Exception {
this.messageStore.start();
}

if (this.replicasManager != null) {
this.replicasManager.start();
}

if (remotingServerStartLatch != null) {
remotingServerStartLatch.await();
}
Expand Down Expand Up @@ -1441,7 +1452,7 @@ public void start() throws Exception {

startBasicService();

if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable() && !this.messageStoreConfig.isStartupControllerMode()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
Expand All @@ -1466,20 +1477,7 @@ public void run2() {
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run2() {
if (isIsolated) {
return;
}
try {
BrokerController.this.sendHeartbeat();
} catch (Exception e) {
BrokerController.LOG.error("sendHeartbeat Exception", e);
}

}
}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
scheduleSendHeartbeat();

scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override public void run2() {
Expand All @@ -1492,11 +1490,32 @@ public void run2() {
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

if (this.messageStoreConfig.isStartupControllerMode()) {
scheduleSendHeartbeat();
}

if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}
}

protected void scheduleSendHeartbeat() {
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run2() {
if (isIsolated) {
return;
}
try {
BrokerController.this.sendHeartbeat();
} catch (Exception e) {
BrokerController.LOG.error("sendHeartbeat Exception", e);
}

}
}, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
}

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
}
Expand Down Expand Up @@ -1593,7 +1612,7 @@ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
(this.brokerConfig.isEnableSlaveActingMaster() || this.messageStoreConfig.isStartupControllerMode()) ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
this.getBrokerIdentity());

handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
Expand Down Expand Up @@ -1657,7 +1676,6 @@ protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBro
}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
Expand Down Expand Up @@ -2131,6 +2149,10 @@ public BrokerIdentity getBrokerIdentity() {
}
}

public ReplicasManager getReplicasManager() {
return replicasManager;
}

public boolean isIsolated() {
return this.isIsolated;
}
Expand Down
Loading

0 comments on commit ad8851b

Please sign in to comment.