Skip to content

Commit

Permalink
fix: port already bind
Browse files Browse the repository at this point in the history
  • Loading branch information
hzh0425 committed May 16, 2022
1 parent e758264 commit c41aefc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
Expand All @@ -49,22 +50,29 @@ public class AutoSwitchRoleBase {

private final String storePathRootParentDir = System.getProperty("user.home") + File.separator +
UUID.randomUUID().toString().replace("-", "");
private static final AtomicInteger PORT_COUNTER = new AtomicInteger(35000);
private final String storePathRootDir = storePathRootParentDir + File.separator + "store";
private final String StoreMessage = "Once, there was a chance for me!";
private final byte[] MessageBody = StoreMessage.getBytes();
private final AtomicInteger QueueId = new AtomicInteger(0);
protected List<BrokerController> brokerList = new ArrayList<>();
private static final Random random = new Random();
protected List<BrokerController> brokerList;
private SocketAddress BornHost;
private SocketAddress StoreHost;

protected void initialize() {
this.brokerList = new ArrayList<>();
try {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
} catch (Exception ignored) {
}
}

public int nextPort() {
return PORT_COUNTER.addAndGet(10 + random.nextInt(10));
}

public BrokerController startBroker(String namesrvAddress, int brokerId, int haPort, int brokerListenPort,
int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.rocketmq.test.autoswitchrole;

import java.io.File;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.hacontroller.ReplicasManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
Expand Down Expand Up @@ -52,18 +54,19 @@ public void init(int mappedFileSize) throws Exception {
super.initialize();

// Startup namesrv
final String peers = String.format("n0-localhost:%d", 30000);
final String peers = String.format("n0-localhost:%d", nextPort());
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(31000);
int namesrvPort = nextPort();
serverConfig.setListenPort(namesrvPort);

this.controllerConfig = buildControllerConfig("n0", peers);
this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig(), controllerConfig);
assertTrue(namesrvController.initialize());
namesrvController.start();
this.namesrvAddress = "127.0.0.1:31000;";
this.namesrvAddress = "127.0.0.1:" + namesrvPort + ";";

this.brokerController1 = startBroker(this.namesrvAddress, 1, 20000, 21000, 22000, BrokerRole.SYNC_MASTER, mappedFileSize);
this.brokerController2 = startBroker(this.namesrvAddress, 2, 20001, 21001, 22001, BrokerRole.SLAVE, mappedFileSize);
this.brokerController1 = startBroker(this.namesrvAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
this.brokerController2 = startBroker(this.namesrvAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);

// Wait slave connecting to master
assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
Expand Down Expand Up @@ -93,12 +96,6 @@ public boolean waitSlaveReady(MessageStore messageStore) throws InterruptedExcep
return false;
}

@Test
public void test() throws Exception {
init(defaultFileSize);
mockData();
}

@Test
public void testCheckSyncStateSet() throws Exception {
init(defaultFileSize);
Expand All @@ -117,15 +114,15 @@ public void testChangeMaster() throws Exception {

// Let master shutdown
brokerController1.shutdown();
this.brokerList.remove(this.brokerController1);
Thread.sleep(5000);

// The slave should change to master
assertTrue(brokerController2.getReplicasManager().isMasterState());
assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);

// Restart old master, it should be slave
this.brokerList.remove(this.brokerController1);
brokerController1 = startBroker(this.namesrvAddress, 1, 20000, 21000, 22000, BrokerRole.SLAVE, defaultFileSize);
brokerController1 = startBroker(this.namesrvAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
waitSlaveReady(brokerController1.getMessageStore());

assertFalse(brokerController1.getReplicasManager().isMasterState());
Expand All @@ -144,7 +141,7 @@ public void testAddBroker() throws Exception {
init(defaultFileSize);
mockData();

BrokerController broker3 = startBroker(this.namesrvAddress, 3, 20005, 21005, 22005, BrokerRole.SLAVE, defaultFileSize);
BrokerController broker3 = startBroker(this.namesrvAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
waitSlaveReady(broker3.getMessageStore());
Thread.sleep(6000);

Expand All @@ -164,13 +161,14 @@ public void testTruncateEpochLogAndChangeMaster() throws Exception {

// Step2: shutdown broker1, broker2 as master
brokerController1.shutdown();
this.brokerList.remove(brokerController1);
Thread.sleep(5000);

assertTrue(brokerController2.getReplicasManager().isMasterState());
assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);

// Step3: add broker3
BrokerController broker3 = startBroker(this.namesrvAddress, 3, 20005, 21005, 22005, BrokerRole.SLAVE, 1700);
BrokerController broker3 = startBroker(this.namesrvAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
waitSlaveReady(broker3.getMessageStore());
Thread.sleep(6000);
checkMessage(broker3.getMessageStore(), 10, 0);
Expand Down Expand Up @@ -198,16 +196,18 @@ public void testTruncateEpochLogAndChangeMaster() throws Exception {
checkMessage(broker2MessageStore, 10, 10);

// Step6, start broker4, link to broker2, it should sync msg from epoch2(offset = 1700).
BrokerController broker4 = startBroker(this.namesrvAddress, 4, 20008, 21008, 22008, BrokerRole.SLAVE, 1700);
BrokerController broker4 = startBroker(this.namesrvAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
waitSlaveReady(broker4.getMessageStore());
Thread.sleep(6000);
checkMessage(broker4.getMessageStore(), 10, 10);
}

@After
public void shutdown() {
public void shutdown() throws InterruptedException {
for (BrokerController controller : this.brokerList) {
controller.shutdown();
System.out.println("Shutdown broker " + controller.getBrokerConfig().getListenPort());
UtilAll.deleteFile(new File(controller.getMessageStoreConfig().getStorePathRootDir()));
}
if (this.namesrvController != null) {
this.namesrvController.shutdown();
Expand Down

0 comments on commit c41aefc

Please sign in to comment.