Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Summer of code] Let broker send heartbeat to controller #4341

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
Expand Down Expand Up @@ -603,7 +604,7 @@ public void run() {
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !messageStoreConfig.isStartupControllerMode()) {
if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !brokerConfig.isStartupControllerMode()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
Expand Down Expand Up @@ -639,7 +640,7 @@ public void run() {
}
}

if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
this.updateMasterHAServerAddrPeriodically = true;
}
}
Expand Down Expand Up @@ -711,7 +712,7 @@ public boolean initialize() throws CloneNotSupportedException {
MessageStorePluginContext context = new MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager, messageArrivingListener);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
this.replicasManager = new ReplicasManager(this);
}
} catch (IOException e) {
Expand Down Expand Up @@ -1452,7 +1453,7 @@ public void start() throws Exception {

startBasicService();

if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable() && !this.messageStoreConfig.isStartupControllerMode()) {
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable() && !this.brokerConfig.isStartupControllerMode()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
Expand Down Expand Up @@ -1490,7 +1491,7 @@ public void run2() {
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
}

if (this.messageStoreConfig.isStartupControllerMode()) {
if (this.brokerConfig.isStartupControllerMode()) {
scheduleSendHeartbeat();
}

Expand Down Expand Up @@ -1600,6 +1601,10 @@ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
return;
}
Long heartbeatTimeoutMillis = (this.brokerConfig.isEnableSlaveActingMaster() ||
(this.brokerConfig.isStartupControllerMode() && !this.brokerConfig.isControllerDeployedStandAlone())) ?
this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null;

List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
Expand All @@ -1612,30 +1617,51 @@ protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
(this.brokerConfig.isEnableSlaveActingMaster() || this.messageStoreConfig.isStartupControllerMode()) ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
heartbeatTimeoutMillis,
this.getBrokerIdentity());

handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
}

protected void sendHeartbeat() {
if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
this.brokerOuterAPI.sendHeartbeatViaDataVersion(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.getTopicConfigManager().getDataVersion(),
this.brokerConfig.isInBrokerContainer());
} else {
this.brokerOuterAPI.sendHeartbeat(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer());
boolean shouldSendHeartbeatToController = this.brokerConfig.isStartupControllerMode() && this.brokerConfig.isControllerDeployedStandAlone();
if (shouldSendHeartbeatToController) {
final List<String> controllerAddresses = this.replicasManager.getControllerAddresses();
for (String controllerAddress : controllerAddresses) {
if (StringUtils.isNotEmpty(controllerAddress)) {
this.brokerOuterAPI.sendHeartbeatToController(
controllerAddress,
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer()
);
}
}
}

boolean shouldSendHeartbeatToNameSrv = this.brokerConfig.isEnableSlaveActingMaster() || !shouldSendHeartbeatToController;
if (shouldSendHeartbeatToNameSrv) {
if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
this.brokerOuterAPI.sendHeartbeatViaDataVersion(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.getTopicConfigManager().getDataVersion(),
this.brokerConfig.isInBrokerContainer());
} else {
this.brokerOuterAPI.sendHeartbeat(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class ReplicasManager {
private final List<String> controllerAddresses;

private volatile String controllerLeaderAddress = "";
private volatile State state = State.INITIAL;

private ScheduledFuture<?> checkSyncStateSetTaskFuture;
private ScheduledFuture<?> slaveSyncFuture;
Expand All @@ -76,7 +77,7 @@ public ReplicasManager(final BrokerController brokerController) {
this.brokerController = brokerController;
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
this.executorService = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
Expand All @@ -89,19 +90,56 @@ public ReplicasManager(final BrokerController brokerController) {
this.haService.setLocalAddress(this.localAddress);
}

enum State {
INITIAL,
FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
RUNNING,
}

public void start() {
if (!schedulingSyncControllerMetadata()) {
return;
if (!startBasicService()) {
LOGGER.error("Failed to start replicasManager");
this.executorService.submit(() -> {
int tryTimes = 1;
while (!startBasicService()) {
tryTimes++;
LOGGER.error("Failed to start replicasManager, try times:{}, current state:{}, try it again", tryTimes, this.state);
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
}
LOGGER.info("Start replicasManager success, try times:{}", tryTimes);
});
}
}

private boolean startBasicService() {
if (this.state == State.INITIAL) {
if (schedulingSyncControllerMetadata()) {
LOGGER.info("First time sync controller metadata success");
this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
} else {
return false;
}
}

if (!registerBroker()) {
return;
if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
if (registerBroker()) {
LOGGER.info("First time register broker success");
this.state = State.RUNNING;
} else {
return false;
}
}

schedulingSyncBrokerMetadata();
return true;
}

public void shutdown() {
this.state = State.INITIAL;
this.executorService.shutdown();
this.scheduledService.shutdown();
}

Expand Down Expand Up @@ -143,7 +181,7 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch
}
}

public void changeToSlave(final String newMasterAddress, final int newMasterEpoch) {
public void changeToSlave(final String newMasterAddress, final int newMasterEpoch, long brokerId) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());
Expand All @@ -156,6 +194,7 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
// Change config
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
this.brokerController.changeSpecialServiceStatus(false);
this.brokerConfig.setBrokerId(brokerId);

// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
Expand Down Expand Up @@ -214,12 +253,11 @@ private boolean registerBroker() {
try {
final BrokerRegisterResponseHeader registerResponse = this.brokerOuterAPI.registerBroker(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress);
final String newMasterAddress = registerResponse.getMasterAddress();
this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
if (StringUtils.isNoneEmpty(newMasterAddress)) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch());
} else {
changeToSlave(newMasterAddress, registerResponse.getMasterEpoch());
changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
}
}
return true;
Expand All @@ -235,18 +273,24 @@ private boolean registerBroker() {
private void schedulingSyncBrokerMetadata() {
this.scheduledService.scheduleAtFixedRate(() -> {
try {
final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress, this.brokerConfig.getBrokerName());
final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress, this.brokerConfig.getBrokerName(), this.localAddress);
final GetReplicaInfoResponseHeader info = result.getObject1();
final SyncStateSet syncStateSet = result.getObject2();
final String newMasterAddress = info.getMasterAddress();
final int newMasterEpoch = info.getMasterEpoch();
final long brokerId = info.getBrokerId();
synchronized (this) {
// Check if master changed
if (StringUtils.isNoneEmpty(newMasterAddress) && !StringUtils.equals(this.masterAddress, newMasterAddress) && newMasterEpoch > this.masterEpoch) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
} else {
changeToSlave(newMasterAddress, newMasterEpoch);
if (brokerId > 0) {
changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
} else if (brokerId < 0) {
// If the brokerId is no existed, we should try register again.
registerBroker();
}
}
} else {
// Check if sync state set changed
Expand Down Expand Up @@ -340,7 +384,7 @@ public boolean isMasterState() {
}

public SyncStateSet getSyncStateSet() {
return new SyncStateSet(new HashSet<>(this.syncStateSet), this.syncStateSetEpoch);
return new SyncStateSet(this.syncStateSet, this.syncStateSetEpoch);
}

public String getLocalAddress() {
Expand All @@ -354,4 +398,8 @@ public String getMasterAddress() {
public int getMasterEpoch() {
return masterEpoch;
}

public List<String> getControllerAddresses() {
return controllerAddresses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.SendResult;
Expand Down Expand Up @@ -1062,8 +1063,8 @@ public BrokerRegisterResponseHeader registerBroker(
* Get broker replica info
*/
public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final String controllerAddress,
final String brokerName) throws Exception {
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName);
final String brokerName, final String brokerAddress) throws Exception {
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName, brokerAddress);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
Expand All @@ -1080,4 +1081,38 @@ public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final Str
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

/**
* Send heartbeat to controller
*/
public void sendHeartbeatToController(final String controllerAddress,
final String clusterName,
final String brokerAddr,
final String brokerName,
final Long brokerId,
final int timeoutMills,
final boolean isInBrokerContainer) {
if (StringUtils.isEmpty(controllerAddress)) {
return;
}

final BrokerHeartbeatRequestHeader requestHeader = new BrokerHeartbeatRequestHeader();
requestHeader.setClusterName(clusterName);
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerName(brokerName);

brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
@Override
public void run2() {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);

try {
BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);
} catch (Exception e) {
LOGGER.error("Error happen when send heartbeat to controller {}", controllerAddress, e);
}
}
});
}

}
Loading