Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Summer of code] Let broker send heartbeat to controller #4341

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
Expand Down Expand Up @@ -603,7 +604,7 @@ public void run() {
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !messageStoreConfig.isStartupControllerMode()) {
if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !brokerConfig.isStartupControllerMode()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
Expand Down Expand Up @@ -639,7 +640,7 @@ public void run() {
}
}

if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
this.updateMasterHAServerAddrPeriodically = true;
}
}
Expand Down Expand Up @@ -699,6 +700,7 @@ public boolean initialize() throws CloneNotSupportedException {

if (result) {
try {
this.messageStoreConfig.setStartupControllerMode(this.brokerConfig.isStartupControllerMode());
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());

Expand All @@ -711,7 +713,7 @@ public boolean initialize() throws CloneNotSupportedException {
MessageStorePluginContext context = new MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager, messageArrivingListener);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
this.replicasManager = new ReplicasManager(this);
}
} catch (IOException e) {
Expand Down Expand Up @@ -1452,7 +1454,7 @@ public void start() throws Exception {

startBasicService();

if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable() && !this.messageStoreConfig.isStartupControllerMode()) {
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable() && !this.brokerConfig.isStartupControllerMode()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
Expand Down Expand Up @@ -1490,7 +1492,7 @@ public void run2() {
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
scheduleSendHeartbeat();
}

Expand Down Expand Up @@ -1612,7 +1614,7 @@ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
(this.brokerConfig.isEnableSlaveActingMaster() || this.messageStoreConfig.isStartupControllerMode()) ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
(this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isStartupControllerMode()) ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
this.getBrokerIdentity());

handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
Expand All @@ -1637,6 +1639,21 @@ protected void sendHeartbeat() {
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer());
}
// If in controller mode and the controller is deployed independently
if (this.brokerConfig.isStartupControllerMode() && this.brokerConfig.isControllerDeployedStandAlone()) {
final String controllerLeaderAddress = this.replicasManager.getControllerLeaderAddress();
if (StringUtils.isNotEmpty(controllerLeaderAddress)) {
this.brokerOuterAPI.sendHeartbeatToController(
controllerLeaderAddress,
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer()
);
}
}
}

protected void syncBrokerMemberGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,8 @@ public String getMasterAddress() {
public int getMasterEpoch() {
return masterEpoch;
}

public String getControllerLeaderAddress() {
return controllerLeaderAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.SendResult;
Expand Down Expand Up @@ -1080,4 +1081,38 @@ public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final Str
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

/**
* Send heartbeat to controller
*/
public void sendHeartbeatToController(final String controllerAddress,
final String clusterName,
final String brokerAddr,
final String brokerName,
final Long brokerId,
final int timeoutMills,
final boolean isInBrokerContainer) {
if (StringUtils.isEmpty(controllerAddress)) {
return;
}

final BrokerHeartbeatRequestHeader requestHeader = new BrokerHeartbeatRequestHeader();
requestHeader.setClusterName(clusterName);
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerName(brokerName);

brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
@Override
public void run2() {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);

try {
BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);
} catch (Exception e) {
LOGGER.error("Error happen when send heartbeat to controller {}", controllerAddress, e);
}
}
});
}

}
50 changes: 40 additions & 10 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,24 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean lockInStrictMode = false;

private String controllerAddr = "";

private boolean compatibleWithOldNameSrv = true;

/**
* Is startup controller mode, which support auto switch broker's role.
*/
private boolean startupControllerMode = false;
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Whether the controller is deployed independently
*/
private boolean isControllerDeployedStandAlone = false;
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved

/**
* If isControllerDeployedStandAlone = false, controllerAddr should be equal to namesrv's address.
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved
* If isControllerDeployedStandAlone = true, controllerAddr should be controller's address.
*/
private String controllerAddr = "";

private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;

private long replicasManagerCheckSyncStateSetPeriod = 8 * 1000;
Expand Down Expand Up @@ -1263,14 +1277,6 @@ public void setLockInStrictMode(boolean lockInStrictMode) {
this.lockInStrictMode = lockInStrictMode;
}

public String getControllerAddr() {
return controllerAddr;
}

public void setControllerAddr(String controllerAddr) {
this.controllerAddr = controllerAddr;
}

public boolean isIsolateLogEnable() {
return isolateLogEnable;
}
Expand All @@ -1287,6 +1293,30 @@ public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
}

public boolean isStartupControllerMode() {
return startupControllerMode;
}

public void setStartupControllerMode(boolean startupControllerMode) {
this.startupControllerMode = startupControllerMode;
}

public boolean isControllerDeployedStandAlone() {
return isControllerDeployedStandAlone;
}

public void setControllerDeployedStandAlone(boolean controllerDeployedStandAlone) {
isControllerDeployedStandAlone = controllerDeployedStandAlone;
}

public String getControllerAddr() {
return controllerAddr;
}

public void setControllerAddr(String controllerAddr) {
this.controllerAddr = controllerAddr;
}

public long getReplicasManagerSyncBrokerMetadataPeriod() {
return replicasManagerSyncBrokerMetadataPeriod;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void run2() {
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
scheduleSendHeartbeat();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public BrokerController startBroker(String namesrvAddress, int brokerId, int haP
brokerConfig.setControllerAddr(namesrvAddress);
brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(4 * 1000);
brokerConfig.setStartupControllerMode(true);

final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(nettyListenPort);
Expand Down Expand Up @@ -110,7 +111,6 @@ protected MessageStoreConfig buildMessageStoreConfig(final String brokerName, fi
storeConfig.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + File.separator + "EpochFileCache");
storeConfig.setTotalReplicas(3);
storeConfig.setInSyncReplicas(2);
storeConfig.setStartupControllerMode(true);

storeConfig.setMappedFileSizeCommitLog(mappedFileSize);
storeConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
Expand Down