Skip to content

Commit

Permalink
Polish switching logic and auto switch ha code (#4406)
Browse files Browse the repository at this point in the history
* Polish switching logic and auto switch ha code

* Make UT can pass

* Polish the code
  • Loading branch information
RongtongJin committed Jun 4, 2022
1 parent 0584118 commit f27800a
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);

// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);

this.executorService.submit(() -> {
// Register broker to name-srv
try {
Expand All @@ -177,8 +180,6 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch
LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to master", e);
return;
}
// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);
LOGGER.info("Change broker {} to master success, masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, syncStateSetEpoch);
});
}
Expand All @@ -193,6 +194,7 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
// Change record
this.masterAddress = newMasterAddress;
this.masterEpoch = newMasterEpoch;

stopCheckSyncStateSet();

// Change config
Expand All @@ -203,6 +205,9 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);

// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());

this.executorService.submit(() -> {
// Register broker to name-srv
try {
Expand All @@ -212,8 +217,6 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
return;
}

// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());
LOGGER.info("Change broker {} to slave, newMasterAddress:{}, newMasterEpoch:{}", this.localAddress, newMasterAddress, newMasterEpoch);
});
}
Expand All @@ -227,6 +230,8 @@ private void changeSyncStateSet(final Set<String> newSyncStateSet, final int new
this.syncStateSetEpoch = newSyncStateSetEpoch;
this.syncStateSet = new HashSet<>(newSyncStateSet);
this.haService.setSyncStateSet(newSyncStateSet);
} else {
LOGGER.info("Sync state set changed failed, newSyncStateSetEpoch is {} and syncStateSetEpoch is {}", newSyncStateSetEpoch, this.syncStateSetEpoch);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@
import org.apache.rocketmq.logging.InternalLoggerFactory;

/**
* The manager that manages the replicas info for all brokers.
* We can think of this class as the controller's memory state machine
* It should be noted that this class is not thread safe,
* and the upper layer needs to ensure that it can be called sequentially
* The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
* state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can
* be called sequentially
*/
public class ReplicasInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
Expand Down Expand Up @@ -131,7 +130,7 @@ public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
// Generate event
int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
response.setNewSyncStateSetEpoch(epoch);
result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(newSyncStateSet, epoch).encode());
result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode());
final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
result.addEvent(event);
return result;
Expand Down Expand Up @@ -273,7 +272,7 @@ public ControllerResult<GetReplicaInfoResponseHeader> getReplicaInfo(final GetRe
if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
}
result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
return result;
}
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, "Broker metadata is not existed");
Expand Down Expand Up @@ -369,8 +368,8 @@ private void handleElectMaster(final ElectMasterEvent event) {
final String clusterName = event.getClusterName();
final BrokerInfo brokerInfo = new BrokerInfo(clusterName, brokerName);
brokerInfo.addBroker(newMaster, 1L);
final SyncStateInfo replicasInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
this.syncStateSetInfoTable.put(brokerName, replicasInfo);
final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
this.replicaInfoTable.put(brokerName, brokerInfo);
}
}
Expand Down
2 changes: 2 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ else if (!dispatchRequest.isSuccess()) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}


} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
slaveRequestOffset = slaveMaxOffset;
}
byteBufferRead.position(readSocketPos);
maybeExpandInSyncStateSet(slaveMaxOffset);
if (!haService.getSyncStateSet().contains(slaveAddress)) {
maybeExpandInSyncStateSet(slaveMaxOffset);
}
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ public AutoSwitchHAService() {

this.defaultMessageStore.recoverTopicQueueTable();

final HashSet<String> newSyncStateSet = new HashSet<>();
newSyncStateSet.add(this.localAddress);
setSyncStateSet(newSyncStateSet);
LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
}
Expand All @@ -122,6 +119,7 @@ public AutoSwitchHAService() {
this.haClient.setLocalAddress(this.localAddress);
this.haClient.updateSlaveId(slaveId);
this.haClient.updateMasterAddress(newMasterAddr);
this.haClient.updateHaMasterAddress(null);
this.haClient.start();
LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -189,6 +191,7 @@ private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs,
public void testAsyncLearnerBrokerRole() throws Exception {
init(defaultMappedFileSize);
((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");

storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
Expand Down Expand Up @@ -217,6 +220,7 @@ public void testOptionAllAckInSyncStateSet() throws Exception {
init(defaultMappedFileSize, true);
AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet) -> {
System.out.println("Get newSyncStateSet:" + newSyncStateSet);
Expand Down

0 comments on commit f27800a

Please sign in to comment.