Skip to content

Commit

Permalink
[ISSUE apache#6023] Add a unit test to verify new register process in…
Browse files Browse the repository at this point in the history
… broker with controller mode (apache#6024)

* refactor: simplify getPID (apache#5962)

* [ISSUE apache#5923] Add example tiered storage backend service provider (apache#5926)

* implement example file segment

* add metrics

* add readme

* fix license

* fix tests

* fix links in README.md

* add comment to PosixFileSegment and mark as experimental

* fix test

* optimize image quality

* Remove the useless exception class: MQRedirectException apache#5963

* [ISSUE apache#5965] Fix lmqTopicQueueTable initialization (apache#5968)

* [ISSUE apache#5965] Fix lmqTopicQueueTable initialization

* [ISSUE apache#5965] Fix lmqTopicQueueTable initialization

* [ISSUE apache#5890] Fix dledger logging (apache#5959)

* Fix dledger logging

* Add bridge into store module

* [ISSUE apache#5860] Set the value of order when create or update topic (apache#5861)

* [ISSUE apache#5939]Adjust the MQClientInstance#sendHeartbeatToAllBroker catch code block log print level from info to warn (apache#5940)

* [ISSUE apache#5924] Optimize UtilAll#sleep method (apache#5925)

* [ISSUE apache#5924]Optimize UtilAll#sleep method

* polish code

* [ISSUE apache#5986] optimize the BrokerOuterAPITest class code

Co-authored-by: zhouyunpeng <2474138779@qq.com>

* [ISSUE apache#5971] Make the internal logs related to the dledger in the controller print to a file separately (apache#5972)

* Make the internal logs related to the dledger in the controller print to a file separately

* Make the internal logs related to the dledger in the controller print to a file separately

* [ISSUE apache#5969] Remvoe duplicate deleteUnusedStats in admin processor (apache#5973)

* [ISSUE apache#5847] Add checkBlock for hasMsgFromQueue

* [ISSUE apache#5983] Make consumer support flow control code better (apache#5984)

* When encountering the flow control code, pull it after 20ms instead of 3s

* When encountering the flow control code, pull it after 20ms instead of 3s

* [ISSUE apache#5896] feat:add pop consumer example (apache#5991)

* feat:add pop consumer

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* Update example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java

Co-authored-by: Oliver <wqdyxnbd@163.com>

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

* feat:fix

---------

Co-authored-by: mahaitao617 <mahaitao617@mahaitao617deMacBook-Pro.local>
Co-authored-by: Oliver <wqdyxnbd@163.com>

* [ISSUE apache#5942] Fix the produce count include the quantity of the system topic(apache#5943)

* [ISSUE apache#5999] Fix the TopicQueueMappingUtils comments typo (apache#6000)

* [ISSUE apache#5996] Optimize the RemotingSerializable class code (apache#5998)

* simplified RemotingSerializable null check

* optimize the RemotingSerializable class code

* [ISSUE apache#5994] [RIP-46] add pop and timer metrics (apache#5995)

* add pop and timer metrics
* fix according to review comment

* test(broker): add ReplicasManagerRegisterTest to test the register process

1. add ReplicasManagerRegisterTest to test the register process

* chore(pom): modify pom.xml to replace mockito with powermock

1. modify pom.xml to replace mockito with powermock

* build(bazel): export powermock in bazel

1. export powermock in bazel

---------

Co-authored-by: Xinda <xdshent@gmail.com>
Co-authored-by: SSpirits <admin@lv5.moe>
Co-authored-by: loboxu <loboxu@tencent.com>
Co-authored-by: pingww <pingw002@gmail.com>
Co-authored-by: Aaron Ai <yangkun.ayk@gmail.com>
Co-authored-by: Slideee <yechun@corp.netease.com>
Co-authored-by: mxsm <ljbmxsm@gmail.com>
Co-authored-by: hardyfish <85128645+hardyfish@users.noreply.github.com>
Co-authored-by: zhouyunpeng <2474138779@qq.com>
Co-authored-by: rongtong <jinrongtong5@163.com>
Co-authored-by: zhiliatom <87265072+zhiliatom@users.noreply.github.com>
Co-authored-by: zhouxiang <zhouxiang.zzx@alibaba-inc.com>
Co-authored-by: mahaitao <15828010639@163.com>
Co-authored-by: mahaitao617 <mahaitao617@mahaitao617deMacBook-Pro.local>
Co-authored-by: Oliver <wqdyxnbd@163.com>
  • Loading branch information
16 people committed Mar 13, 2023
1 parent 077cb36 commit 4594574
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 44 deletions.
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ java_library(
"@maven//:org_assertj_assertj_core",
"@maven//:org_hamcrest_hamcrest_library",
"@maven//:org_mockito_mockito_core",
"@maven//:org_powermock_powermock_module_junit4",
"@maven//:org_powermock_powermock_api_mockito2",
"@maven//:org_hamcrest_hamcrest_core",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:org_awaitility_awaitility",
Expand Down
3 changes: 3 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ maven_install(
"io.netty:netty-all:4.1.65.Final",
"org.assertj:assertj-core:3.22.0",
"org.mockito:mockito-core:3.10.0",
"org.powermock:powermock-module-junit4:2.0.9",
"org.powermock:powermock-api-mockito2:2.0.9",

"com.github.luben:zstd-jni:1.5.2-2",
"org.lz4:lz4-java:1.8.0",
"commons-validator:commons-validator:1.7",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private boolean startBasicService() {
}
}
// register 5 times but still unsuccessful
if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
if (this.state != State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) {
return false;
}
}
Expand All @@ -208,6 +208,7 @@ private boolean startBasicService() {

public void shutdown() {
this.state = State.SHUTDOWN;
this.registerState = RegisterState.INITIAL;
this.executorService.shutdown();
this.scheduledService.shutdown();
}
Expand Down Expand Up @@ -373,37 +374,6 @@ private boolean brokerElect() {
}
}

// private boolean registerBrokerToController() {
// // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file.
// try {
// final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
// this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
// this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
// final String newMasterAddress = registerResponse.getMasterAddress();
// if (StringUtils.isNoneEmpty(newMasterAddress)) {
// if (StringUtils.equals(newMasterAddress, this.localAddress)) {
// changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch());
// } else {
// changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
// }
// // Set isolated to false, make broker can register to namesrv regularly
// brokerController.setIsolated(false);
// } else {
// // if master address is empty, just apply the brokerId
// if (registerResponse.getBrokerId() <= 0) {
// // wrong broker id
// LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId());
// return false;
// }
// this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
// }
// return true;
// } catch (final Exception e) {
// LOGGER.error("Failed to register broker to controller", e);
// return false;
// }
// }

/**
* Register broker to controller, and persist the metadata to file
* @return whether registering process succeeded
Expand Down Expand Up @@ -493,7 +463,7 @@ private boolean applyBrokerId() {
return true;

} catch (Exception e) {
LOGGER.error("fail to apply broker id", e);
LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
return false;
}
}
Expand Down Expand Up @@ -780,4 +750,20 @@ public List<String> getAvailableControllerAddresses() {
public Long getBrokerId() {
return brokerId;
}

public RegisterState getRegisterState() {
return registerState;
}

public State getState() {
return state;
}

public BrokerMetadata getBrokerMetadata() {
return brokerMetadata;
}

public TempBrokerMetadata getTempBrokerMetadata() {
return tempBrokerMetadata;
}
}

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
<junit.version>4.13.2</junit.version>
<assertj-core.version>3.22.0</assertj-core.version>
<mockito-core.version>3.10.0</mockito-core.version>
<powermock-version>2.0.9</powermock-version>
<awaitility.version>4.1.0</awaitility.version>
<truth.version>0.30</truth.version>

Expand Down Expand Up @@ -981,5 +982,31 @@
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock-version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
</exclusion>
<exclusion>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</exclusion>
<exclusion>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
public ElectMasterResponseHeader() {
}

public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch) {
this.masterBrokerId = masterBrokerId;
this.masterAddress = masterAddress;
this.masterEpoch = masterEpoch;
this.syncStateSetEpoch = syncStateSetEpoch;
}

public String getMasterAddress() {
return masterAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.commons.lang3.StringUtils;

import java.util.Objects;

public class BrokerMetadata extends MetadataFile {

private String clusterName;
Expand Down Expand Up @@ -79,4 +81,17 @@ public Long getBrokerId() {
public String getClusterName() {
return clusterName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BrokerMetadata that = (BrokerMetadata) o;
return Objects.equals(clusterName, that.clusterName) && Objects.equals(brokerName, that.brokerName) && Objects.equals(brokerId, that.brokerId);
}

@Override
public int hashCode() {
return Objects.hash(clusterName, brokerName, brokerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.store.ha.autoswitch;

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;

import java.io.File;

Expand All @@ -34,7 +35,7 @@ public abstract class MetadataFile {
public abstract void clearInMem();

public void writeToFile() throws Exception {
deleteFile();
UtilAll.deleteFile(new File(filePath));
MixAll.string2File(encodeToStr(), this.filePath);
}

Expand All @@ -47,14 +48,12 @@ public boolean fileExists() {
return file.exists();
}

public void deleteFile() {
File file = new File(filePath);
file.deleteOnExit();
}

public void clear() {
clearInMem();
deleteFile();
UtilAll.deleteFile(new File(filePath));
}

public String getFilePath() {
return filePath;
}
}
4 changes: 0 additions & 4 deletions test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down

0 comments on commit 4594574

Please sign in to comment.