From 459457468826aaace3575584ca5032b8a329f116 Mon Sep 17 00:00:00 2001 From: TheR1sing3un <87409330+TheR1sing3un@users.noreply.github.com> Date: Fri, 10 Feb 2023 13:41:58 +0800 Subject: [PATCH] [ISSUE #6023] Add a unit test to verify new register process in broker with controller mode (#6024) * refactor: simplify getPID (#5962) * [ISSUE #5923] Add example tiered storage backend service provider (#5926) * implement example file segment * add metrics * add readme * fix license * fix tests * fix links in README.md * add comment to PosixFileSegment and mark as experimental * fix test * optimize image quality * Remove the useless exception class: MQRedirectException #5963 * [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968) * [ISSUE #5965] Fix lmqTopicQueueTable initialization * [ISSUE #5965] Fix lmqTopicQueueTable initialization * [ISSUE #5890] Fix dledger logging (#5959) * Fix dledger logging * Add bridge into store module * [ISSUE #5860] Set the value of order when create or update topic (#5861) * [ISSUE #5939]Adjust the MQClientInstance#sendHeartbeatToAllBroker catch code block log print level from info to warn (#5940) * [ISSUE #5924] Optimize UtilAll#sleep method (#5925) * [ISSUE #5924]Optimize UtilAll#sleep method * polish code * [ISSUE #5986] optimize the BrokerOuterAPITest class code Co-authored-by: zhouyunpeng <2474138779@qq.com> * [ISSUE #5971] Make the internal logs related to the dledger in the controller print to a file separately (#5972) * Make the internal logs related to the dledger in the controller print to a file separately * Make the internal logs related to the dledger in the controller print to a file separately * [ISSUE #5969] Remvoe duplicate deleteUnusedStats in admin processor (#5973) * [ISSUE #5847] Add checkBlock for hasMsgFromQueue * [ISSUE #5983] Make consumer support flow control code better (#5984) * When encountering the flow control code, pull it after 20ms instead of 3s * When encountering the flow control code, pull it after 20ms instead of 3s * [ISSUE #5896] feat:add pop consumer example (#5991) * feat:add pop consumer * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * Update example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java Co-authored-by: Oliver * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix * feat:fix --------- Co-authored-by: mahaitao617 Co-authored-by: Oliver * [ISSUE #5942] Fix the produce count include the quantity of the system topic(#5943) * [ISSUE #5999] Fix the TopicQueueMappingUtils comments typo (#6000) * [ISSUE #5996] Optimize the RemotingSerializable class code (#5998) * simplified RemotingSerializable null check * optimize the RemotingSerializable class code * [ISSUE #5994] [RIP-46] add pop and timer metrics (#5995) * add pop and timer metrics * fix according to review comment * test(broker): add ReplicasManagerRegisterTest to test the register process 1. add ReplicasManagerRegisterTest to test the register process * chore(pom): modify pom.xml to replace mockito with powermock 1. modify pom.xml to replace mockito with powermock * build(bazel): export powermock in bazel 1. export powermock in bazel --------- Co-authored-by: Xinda Co-authored-by: SSpirits Co-authored-by: loboxu Co-authored-by: pingww Co-authored-by: Aaron Ai Co-authored-by: Slideee Co-authored-by: mxsm Co-authored-by: hardyfish <85128645+hardyfish@users.noreply.github.com> Co-authored-by: zhouyunpeng <2474138779@qq.com> Co-authored-by: rongtong Co-authored-by: zhiliatom <87265072+zhiliatom@users.noreply.github.com> Co-authored-by: zhouxiang Co-authored-by: mahaitao <15828010639@163.com> Co-authored-by: mahaitao617 Co-authored-by: Oliver --- BUILD.bazel | 2 + WORKSPACE | 3 + .../broker/controller/ReplicasManager.java | 52 ++- .../ReplicasManagerRegisterTest.java | 302 ++++++++++++++++++ pom.xml | 27 ++ .../controller/ElectMasterResponseHeader.java | 7 + .../store/ha/autoswitch/BrokerMetadata.java | 15 + .../store/ha/autoswitch/MetadataFile.java | 13 +- test/pom.xml | 4 - 9 files changed, 381 insertions(+), 44 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java diff --git a/BUILD.bazel b/BUILD.bazel index 0663d5774f4..358527c3149 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -37,6 +37,8 @@ java_library( "@maven//:org_assertj_assertj_core", "@maven//:org_hamcrest_hamcrest_library", "@maven//:org_mockito_mockito_core", + "@maven//:org_powermock_powermock_module_junit4", + "@maven//:org_powermock_powermock_api_mockito2", "@maven//:org_hamcrest_hamcrest_core", "@maven//:ch_qos_logback_logback_classic", "@maven//:org_awaitility_awaitility", diff --git a/WORKSPACE b/WORKSPACE index aeef01e4a93..5a1b3562a8e 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -45,6 +45,9 @@ maven_install( "io.netty:netty-all:4.1.65.Final", "org.assertj:assertj-core:3.22.0", "org.mockito:mockito-core:3.10.0", + "org.powermock:powermock-module-junit4:2.0.9", + "org.powermock:powermock-api-mockito2:2.0.9", + "com.github.luben:zstd-jni:1.5.2-2", "org.lz4:lz4-java:1.8.0", "commons-validator:commons-validator:1.7", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index a3d62fc20c3..3e4a23c6005 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -185,7 +185,7 @@ private boolean startBasicService() { } } // register 5 times but still unsuccessful - if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) { + if (this.state != State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) { return false; } } @@ -208,6 +208,7 @@ private boolean startBasicService() { public void shutdown() { this.state = State.SHUTDOWN; + this.registerState = RegisterState.INITIAL; this.executorService.shutdown(); this.scheduledService.shutdown(); } @@ -373,37 +374,6 @@ private boolean brokerElect() { } } -// private boolean registerBrokerToController() { -// // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file. -// try { -// final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress, -// this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, this.brokerConfig.getControllerHeartBeatTimeoutMills(), -// this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority()); -// final String newMasterAddress = registerResponse.getMasterAddress(); -// if (StringUtils.isNoneEmpty(newMasterAddress)) { -// if (StringUtils.equals(newMasterAddress, this.localAddress)) { -// changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch()); -// } else { -// changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId()); -// } -// // Set isolated to false, make broker can register to namesrv regularly -// brokerController.setIsolated(false); -// } else { -// // if master address is empty, just apply the brokerId -// if (registerResponse.getBrokerId() <= 0) { -// // wrong broker id -// LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId()); -// return false; -// } -// this.brokerConfig.setBrokerId(registerResponse.getBrokerId()); -// } -// return true; -// } catch (final Exception e) { -// LOGGER.error("Failed to register broker to controller", e); -// return false; -// } -// } - /** * Register broker to controller, and persist the metadata to file * @return whether registering process succeeded @@ -493,7 +463,7 @@ private boolean applyBrokerId() { return true; } catch (Exception e) { - LOGGER.error("fail to apply broker id", e); + LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId()); return false; } } @@ -780,4 +750,20 @@ public List getAvailableControllerAddresses() { public Long getBrokerId() { return brokerId; } + + public RegisterState getRegisterState() { + return registerState; + } + + public State getState() { + return state; + } + + public BrokerMetadata getBrokerMetadata() { + return brokerMetadata; + } + + public TempBrokerMetadata getTempBrokerMetadata() { + return tempBrokerMetadata; + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java new file mode 100644 index 00000000000..2148e222ca6 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.controller; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.broker.slave.SlaveSynchronize; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; +import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata; +import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ReplicasManager.class) +public class ReplicasManagerRegisterTest { + + public static final String STORE_BASE_PATH = System.getProperty("user.home") + File.separator + "BrokerControllerRegisterTest" + File.separator + + UUID.randomUUID().toString().replace("-", ""); + + public static final String BROKER_NAME = "default-broker"; + + public static final String CLUSTER_NAME = "default-cluster"; + + public static final String NAME_SRV_ADDR = "127.0.0.1:9999"; + + public static final String CONTROLLER_ADDR = "127.0.0.1:8888"; + + public static final BrokerConfig BROKER_CONFIG; + + static { + BROKER_CONFIG = new BrokerConfig(); + BROKER_CONFIG.setListenPort(21030); + BROKER_CONFIG.setNamesrvAddr(NAME_SRV_ADDR); + BROKER_CONFIG.setControllerAddr(CONTROLLER_ADDR); + BROKER_CONFIG.setSyncControllerMetadataPeriod(2 * 1000); + BROKER_CONFIG.setEnableControllerMode(true); + BROKER_CONFIG.setBrokerName(BROKER_NAME); + BROKER_CONFIG.setBrokerClusterName(CLUSTER_NAME); + } + + private MessageStoreConfig buildMessageStoreConfig(int id) { + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathRootDir(STORE_BASE_PATH + File.separator + id); + config.setStorePathCommitLog(config.getStorePathRootDir() + File.separator + "commitLog"); + config.setStorePathEpochFile(config.getStorePathRootDir() + File.separator + "epochFileCache"); + config.setStorePathMetadata(config.getStorePathRootDir() + File.separator + "metadata"); + config.setStorePathTempMetadata(config.getStorePathRootDir() + File.separator + "tempMetadata"); + return config; + } + + @Mock + private BrokerController mockedBrokerController; + + @Mock + private DefaultMessageStore mockedMessageStore; + + @Mock + private BrokerOuterAPI mockedBrokerOuterAPI; + + @Mock + private AutoSwitchHAService mockedAutoSwitchHAService; + + @Before + public void setUp() throws Exception { + when(mockedBrokerController.getBrokerOuterAPI()).thenReturn(mockedBrokerOuterAPI); + when(mockedBrokerController.getMessageStore()).thenReturn(mockedMessageStore); + when(mockedBrokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG); + when(mockedMessageStore.getHaService()).thenReturn(mockedAutoSwitchHAService); + when(mockedBrokerController.getSlaveSynchronize()).thenReturn(new SlaveSynchronize(mockedBrokerController)); + + when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn( + new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR)); + when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true); + when(mockedBrokerController.getMessageStoreConfig()).thenReturn(buildMessageStoreConfig(0)); + } + + @Test + public void testBrokerRegisterSuccess() throws Exception { + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); + when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); + when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader()); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + + ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); + replicasManager0.start(); + await().atMost(Duration.ofMillis(1000)).until(() -> + replicasManager0.getState() == ReplicasManager.State.RUNNING + ); + Assert.assertEquals(ReplicasManager.RegisterState.REGISTERED, replicasManager0.getRegisterState()); + Assert.assertEquals(1L, replicasManager0.getBrokerId().longValue()); + checkMetadataFile(replicasManager0.getBrokerMetadata(), 1L); + Assert.assertFalse(replicasManager0.getTempBrokerMetadata().isLoaded()); + Assert.assertFalse(replicasManager0.getTempBrokerMetadata().fileExists()); + } + + @Test + public void testRegisterFailedAtGetNextBrokerId() throws Exception { + ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenThrow(new RuntimeException()); + + replicasManager.start(); + + Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, replicasManager.getState()); + Assert.assertEquals(ReplicasManager.RegisterState.INITIAL, replicasManager.getRegisterState()); + Assert.assertFalse(replicasManager.getTempBrokerMetadata().fileExists()); + Assert.assertFalse(replicasManager.getBrokerMetadata().fileExists()); + Assert.assertNull(replicasManager.getBrokerId()); + } + + @Test + public void testRegisterFailedAtCreateTempFile() throws Exception { + ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); + when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); + when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader()); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); + PowerMockito.doReturn(false).when(spyReplicasManager, "createTempMetadataFile", anyLong()); + + spyReplicasManager.start(); + + Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, spyReplicasManager.getState()); + Assert.assertEquals(ReplicasManager.RegisterState.INITIAL, spyReplicasManager.getRegisterState()); + Assert.assertFalse(spyReplicasManager.getTempBrokerMetadata().fileExists()); + Assert.assertFalse(spyReplicasManager.getBrokerMetadata().fileExists()); + Assert.assertNull(spyReplicasManager.getBrokerId()); + } + + @Test + public void testRegisterFailedAtApplyBrokerIdFailed() throws Exception { + ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); + when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); + when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader()); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + + replicasManager.start(); + + Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, replicasManager.getState()); + Assert.assertNotEquals(ReplicasManager.RegisterState.CREATE_METADATA_FILE_DONE, replicasManager.getRegisterState()); + Assert.assertNotEquals(ReplicasManager.RegisterState.REGISTERED, replicasManager.getRegisterState()); + + replicasManager.shutdown(); + + Assert.assertFalse(replicasManager.getBrokerMetadata().fileExists()); + Assert.assertNull(replicasManager.getBrokerId()); + } + + @Test + public void testRegisterFailedAtCreateMetadataFileAndDeleteTemp() throws Exception { + ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); + when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); + when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader()); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + + ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); + PowerMockito.doReturn(false).when(spyReplicasManager, "createMetadataFileAndDeleteTemp"); + + spyReplicasManager.start(); + + Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, spyReplicasManager.getState()); + Assert.assertEquals(ReplicasManager.RegisterState.CREATE_TEMP_METADATA_FILE_DONE, spyReplicasManager.getRegisterState()); + TempBrokerMetadata tempBrokerMetadata = spyReplicasManager.getTempBrokerMetadata(); + Assert.assertTrue(tempBrokerMetadata.fileExists()); + Assert.assertTrue(tempBrokerMetadata.isLoaded()); + Assert.assertFalse(spyReplicasManager.getBrokerMetadata().fileExists()); + Assert.assertNull(spyReplicasManager.getBrokerId()); + + spyReplicasManager.shutdown(); + + // restart, we expect that this replicasManager still keep the tempMetadata and still try to finish its registering + ReplicasManager replicasManagerNew = new ReplicasManager(mockedBrokerController); + // because apply brokerId: 1 has succeeded, so now next broker id is 2 + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 2L)); + + replicasManagerNew.start(); + + Assert.assertEquals(ReplicasManager.State.RUNNING, replicasManagerNew.getState()); + Assert.assertEquals(ReplicasManager.RegisterState.REGISTERED, replicasManagerNew.getRegisterState()); + // tempMetadata has been cleared + Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().fileExists()); + Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().isLoaded()); + // metadata has been persisted + Assert.assertTrue(replicasManagerNew.getBrokerMetadata().fileExists()); + Assert.assertTrue(replicasManagerNew.getBrokerMetadata().isLoaded()); + Assert.assertEquals(1L, replicasManagerNew.getBrokerMetadata().getBrokerId().longValue()); + Assert.assertEquals(1L, replicasManagerNew.getBrokerId().longValue()); + + } + + @Test + public void testRegisterFailedAtRegisterSuccess() throws Exception { + ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); + when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); + when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + + replicasManager.start(); + + Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, replicasManager.getState()); + Assert.assertEquals(ReplicasManager.RegisterState.CREATE_METADATA_FILE_DONE, replicasManager.getRegisterState()); + TempBrokerMetadata tempBrokerMetadata = replicasManager.getTempBrokerMetadata(); + // temp metadata has been cleared + Assert.assertFalse(tempBrokerMetadata.fileExists()); + Assert.assertFalse(tempBrokerMetadata.isLoaded()); + // metadata has been persisted + Assert.assertTrue(replicasManager.getBrokerMetadata().fileExists()); + Assert.assertTrue(replicasManager.getBrokerMetadata().isLoaded()); + Assert.assertEquals(1L, replicasManager.getBrokerMetadata().getBrokerId().longValue()); + Assert.assertEquals(1L, replicasManager.getBrokerId().longValue()); + + replicasManager.shutdown(); + + Mockito.reset(mockedBrokerOuterAPI); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn( + new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR)); + when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true); + + // restart, we expect that this replicasManager still keep the metadata and still try to finish its registering + ReplicasManager replicasManagerNew = new ReplicasManager(mockedBrokerController); + // because apply brokerId: 1 has succeeded, so now next broker id is 2 + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 2L)); + // because apply brokerId: 1 has succeeded, so next request which try to apply brokerId: 1 will be failed + when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), eq(1L), any(), any())).thenThrow(new RuntimeException()); + when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader()); + replicasManagerNew.start(); + + Assert.assertEquals(ReplicasManager.State.RUNNING, replicasManagerNew.getState()); + Assert.assertEquals(ReplicasManager.RegisterState.REGISTERED, replicasManagerNew.getRegisterState()); + // tempMetadata has been cleared + Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().fileExists()); + Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().isLoaded()); + // metadata has been persisted + Assert.assertTrue(replicasManagerNew.getBrokerMetadata().fileExists()); + Assert.assertTrue(replicasManagerNew.getBrokerMetadata().isLoaded()); + Assert.assertEquals(1L, replicasManagerNew.getBrokerMetadata().getBrokerId().longValue()); + Assert.assertEquals(1L, replicasManagerNew.getBrokerId().longValue()); + } + + + private void checkMetadataFile(BrokerMetadata brokerMetadata0 ,Long brokerId) throws Exception { + Assert.assertEquals(brokerId, brokerMetadata0.getBrokerId()); + Assert.assertTrue(brokerMetadata0.fileExists()); + BrokerMetadata brokerMetadata = new BrokerMetadata(brokerMetadata0.getFilePath()); + brokerMetadata.readFromFile(); + Assert.assertEquals(brokerMetadata0, brokerMetadata); + } + + @After + public void clear() { + File file = new File(STORE_BASE_PATH); + UtilAll.deleteFile(file); + } + + +} diff --git a/pom.xml b/pom.xml index 0048b971cac..6fbfb2ab827 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,7 @@ 4.13.2 3.22.0 3.10.0 + 2.0.9 4.1.0 0.30 @@ -981,5 +982,31 @@ awaitility ${awaitility.version} + + org.powermock + powermock-module-junit4 + ${powermock-version} + test + + + org.objenesis + objenesis + + + net.bytebuddy + byte-buddy + + + net.bytebuddy + byte-buddy-agent + + + + + org.powermock + powermock-api-mockito2 + ${powermock-version} + test + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index 897b57c3fdf..d3c8975383f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -29,6 +29,13 @@ public class ElectMasterResponseHeader implements CommandCustomHeader { public ElectMasterResponseHeader() { } + public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch) { + this.masterBrokerId = masterBrokerId; + this.masterAddress = masterAddress; + this.masterEpoch = masterEpoch; + this.syncStateSetEpoch = syncStateSetEpoch; + } + public String getMasterAddress() { return masterAddress; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java index 747eb4944dd..ff48c0a7c43 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java @@ -19,6 +19,8 @@ import org.apache.commons.lang3.StringUtils; +import java.util.Objects; + public class BrokerMetadata extends MetadataFile { private String clusterName; @@ -79,4 +81,17 @@ public Long getBrokerId() { public String getClusterName() { return clusterName; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BrokerMetadata that = (BrokerMetadata) o; + return Objects.equals(clusterName, that.clusterName) && Objects.equals(brokerName, that.brokerName) && Objects.equals(brokerId, that.brokerId); + } + + @Override + public int hashCode() { + return Objects.hash(clusterName, brokerName, brokerId); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java index 2e3c3ba990c..e89aedbea14 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store.ha.autoswitch; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import java.io.File; @@ -34,7 +35,7 @@ public abstract class MetadataFile { public abstract void clearInMem(); public void writeToFile() throws Exception { - deleteFile(); + UtilAll.deleteFile(new File(filePath)); MixAll.string2File(encodeToStr(), this.filePath); } @@ -47,14 +48,12 @@ public boolean fileExists() { return file.exists(); } - public void deleteFile() { - File file = new File(filePath); - file.deleteOnExit(); - } - public void clear() { clearInMem(); - deleteFile(); + UtilAll.deleteFile(new File(filePath)); } + public String getFilePath() { + return filePath; + } } diff --git a/test/pom.xml b/test/pom.xml index 8816c42989e..76031533a67 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -64,10 +64,6 @@ com.google.truth truth - - org.mockito - mockito-core - junit junit