diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java index 48f61f5e4de..94bfa5d6300 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java @@ -169,6 +169,9 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch // Handle the slave synchronise handleSlaveSynchronize(BrokerRole.SYNC_MASTER); + // Notify ha service, change to master + this.haService.changeToMaster(newMasterEpoch); + this.executorService.submit(() -> { // Register broker to name-srv try { @@ -177,8 +180,6 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch LOGGER.error("Error happen when register broker to name-srv, Failed to change broker to master", e); return; } - // Notify ha service, change to master - this.haService.changeToMaster(newMasterEpoch); LOGGER.info("Change broker {} to master success, masterEpoch {}, syncStateSetEpoch:{}", this.localAddress, newMasterEpoch, syncStateSetEpoch); }); } @@ -193,6 +194,7 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc // Change record this.masterAddress = newMasterAddress; this.masterEpoch = newMasterEpoch; + stopCheckSyncStateSet(); // Change config @@ -203,6 +205,9 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc // Handle the slave synchronise handleSlaveSynchronize(BrokerRole.SLAVE); + // Notify ha service, change to slave + this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId()); + this.executorService.submit(() -> { // Register broker to name-srv try { @@ -212,8 +217,6 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc return; } - // Notify ha service, change to slave - this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId()); LOGGER.info("Change broker {} to slave, newMasterAddress:{}, newMasterEpoch:{}", this.localAddress, newMasterAddress, newMasterEpoch); }); } @@ -227,6 +230,8 @@ private void changeSyncStateSet(final Set newSyncStateSet, final int new this.syncStateSetEpoch = newSyncStateSetEpoch; this.syncStateSet = new HashSet<>(newSyncStateSet); this.haService.setSyncStateSet(newSyncStateSet); + } else { + LOGGER.info("Sync state set changed failed, newSyncStateSetEpoch is {} and syncStateSetEpoch is {}", newSyncStateSetEpoch, this.syncStateSetEpoch); } } } diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 5e718f9a998..e4b042dae99 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -49,10 +49,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; /** - * The manager that manages the replicas info for all brokers. - * We can think of this class as the controller's memory state machine - * It should be noted that this class is not thread safe, - * and the upper layer needs to ensure that it can be called sequentially + * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory + * state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can + * be called sequentially */ public class ReplicasInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME); @@ -131,7 +130,7 @@ public ControllerResult alterSyncStateSet( // Generate event int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; response.setNewSyncStateSetEpoch(epoch); - result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(newSyncStateSet, epoch).encode()); + result.setBody(new SyncStateSet(newSyncStateSet, epoch).encode()); final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); result.addEvent(event); return result; @@ -273,7 +272,7 @@ public ControllerResult getReplicaInfo(final GetRe if (StringUtils.isNotEmpty(request.getBrokerAddress())) { response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress())); } - result.setBody(new org.apache.rocketmq.common.protocol.body.SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode()); + result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode()); return result; } result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, "Broker metadata is not existed"); @@ -369,8 +368,8 @@ private void handleElectMaster(final ElectMasterEvent event) { final String clusterName = event.getClusterName(); final BrokerInfo brokerInfo = new BrokerInfo(clusterName, brokerName); brokerInfo.addBroker(newMaster, 1L); - final SyncStateInfo replicasInfo = new SyncStateInfo(clusterName, brokerName, newMaster); - this.syncStateSetInfoTable.put(brokerName, replicasInfo); + final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName, newMaster); + this.syncStateSetInfoTable.put(brokerName, syncStateInfo); this.replicaInfoTable.put(brokerName, brokerInfo); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5a96b5345f1..98b4fc9370e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -352,6 +352,8 @@ else if (!dispatchRequest.isSuccess()) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } + + } else { // Commitlog case files are deleted log.warn("The commitlog files are deleted, and delete the consume queue files"); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index d8a26e20ede..dfd3215d29b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -328,7 +328,9 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) { slaveRequestOffset = slaveMaxOffset; } byteBufferRead.position(readSocketPos); - maybeExpandInSyncStateSet(slaveMaxOffset); + if (!haService.getSyncStateSet().contains(slaveAddress)) { + maybeExpandInSyncStateSet(slaveMaxOffset); + } AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset); LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset); break; diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 84b047e2d84..38758d66339 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -100,9 +100,6 @@ public AutoSwitchHAService() { this.defaultMessageStore.recoverTopicQueueTable(); - final HashSet newSyncStateSet = new HashSet<>(); - newSyncStateSet.add(this.localAddress); - setSyncStateSet(newSyncStateSet); LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset()); return true; } @@ -122,6 +119,7 @@ public AutoSwitchHAService() { this.haClient.setLocalAddress(this.localAddress); this.haClient.updateSlaveId(slaveId); this.haClient.updateMasterAddress(newMasterAddr); + this.haClient.updateHaMasterAddress(null); this.haClient.start(); LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch); return true; diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index 7070db627ed..dfbc35f8128 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -21,6 +21,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -189,6 +191,7 @@ private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, public void testAsyncLearnerBrokerRole() throws Exception { init(defaultMappedFileSize); ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER); @@ -217,6 +220,7 @@ public void testOptionAllAckInSyncStateSet() throws Exception { init(defaultMappedFileSize, true); AtomicReference> syncStateSet = new AtomicReference<>(); ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); + ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000"))); ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); ((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet) -> { System.out.println("Get newSyncStateSet:" + newSyncStateSet);