Skip to content

Commit

Permalink
[ISSUE #5844] Fix bug when examine pop consumer group info
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb committed Jan 30, 2023
1 parent 0ea9301 commit 4f21e62
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
Expand All @@ -86,7 +87,6 @@
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
Expand Down Expand Up @@ -418,35 +418,20 @@ public ConsumeStats examineConsumeStats(
@Override
public ConsumeStats examineConsumeStats(String consumerGroup,
String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
TopicRouteData topicRouteData = null;
List<String> routeTopics = new ArrayList<>();
routeTopics.add(MixAll.getRetryTopic(consumerGroup));
if (topic != null) {
routeTopics.add(topic);
routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup));
}
for (int i = 0; i < routeTopics.size(); i++) {
try {
topicRouteData = this.examineTopicRouteInfo(routeTopics.get(i));
if (topicRouteData != null) {
break;
}
} catch (Throwable e) {
if (i == routeTopics.size() - 1) {
throw e;
}
}
}
ConsumeStats result = new ConsumeStats();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
double value = result.getConsumeTps() + consumeStats.getConsumeTps();
result.setConsumeTps(value);
for (BrokerData brokerData : this.examineBrokerClusterInfo().getBrokerAddrTable().values()) {
if (brokerData.getBrokerAddrs() == null) {
continue;
}
String addr = brokerData.selectBrokerAddr();
if (StringUtils.isEmpty(addr) || StringUtils.isBlank(addr)) {
continue;
}
final ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl()
.getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
final double value = result.getConsumeTps() + consumeStats.getConsumeTps();
result.setConsumeTps(value);
}

Set<String> topics = new HashSet<>();
Expand Down Expand Up @@ -613,14 +598,18 @@ public ConsumerConnection examineConsumerConnectionInfo(
String consumerGroup) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
ConsumerConnection result = new ConsumerConnection();
String topic = MixAll.getRetryTopic(consumerGroup);
List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
String addr = null;
if (brokerData != null) {
for (BrokerData brokerData : this.examineBrokerClusterInfo().getBrokerAddrTable().values()) {
if (brokerData.getBrokerAddrs() == null) {
continue;
}
addr = brokerData.selectBrokerAddr();
if (StringUtils.isNotBlank(addr)) {
result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis);
if (StringUtils.isEmpty(addr) || StringUtils.isBlank(addr)) {
continue;
}
result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis);
if (result != null) {
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
package org.apache.rocketmq.tools.command.consumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.server.NameServerMocker;
import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class ConsumerProgressSubCommandTest {
Expand All @@ -41,7 +43,7 @@ public class ConsumerProgressSubCommandTest {
@Before
public void before() {
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
nameServerMocker = startNameServer();
}

@After
Expand All @@ -50,7 +52,6 @@ public void after() {
nameServerMocker.shutdown();
}

@Ignore
@Test
public void testExecute() throws SubCommandException {
ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
Expand All @@ -63,13 +64,36 @@ public void testExecute() throws SubCommandException {
cmd.execute(commandLine, options, null);
}

private ServerResponseMocker startNameServer() {
ClusterInfo clusterInfo = new ClusterInfo();

HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("mockBrokerName");
HashMap<Long, String> brokerAddress = new HashMap<>();
brokerAddress.put(1L, "127.0.0.1:" + brokerMocker.listenPort());
brokerData.setBrokerAddrs(brokerAddress);
brokerData.setCluster("mockCluster");
brokerAddressTable.put("mockBrokerName", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddressTable);

HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
Set<String> brokerNames = new HashSet<>();
brokerNames.add("mockBrokerName");
clusterAddressTable.put("mockCluster", brokerNames);
clusterInfo.setClusterAddrTable(clusterAddressTable);

// start name server
return ServerResponseMocker.startServer(clusterInfo.encode());
}

private ServerResponseMocker startOneBroker() {
ConsumeStats consumeStats = new ConsumeStats();
HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<>();
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName("mockBrokerName");
messageQueue.setQueueId(1);
messageQueue.setBrokerName("mockTopicName");
messageQueue.setTopic("mockTopicName");

OffsetWrapper offsetWrapper = new OffsetWrapper();
offsetWrapper.setBrokerOffset(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
*/
package org.apache.rocketmq.tools.command.consumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.server.NameServerMocker;
import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.After;
import org.junit.Before;
Expand All @@ -41,7 +44,7 @@ public class ConsumerStatusSubCommandTest {
@Before
public void before() {
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
nameServerMocker = startNameServer();
}

@After
Expand All @@ -62,6 +65,29 @@ public void testExecute() throws SubCommandException {
cmd.execute(commandLine, options, null);
}

private ServerResponseMocker startNameServer() {
ClusterInfo clusterInfo = new ClusterInfo();

HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("mockBrokerName");
HashMap<Long, String> brokerAddress = new HashMap<>();
brokerAddress.put(1L, "127.0.0.1:" + brokerMocker.listenPort());
brokerData.setBrokerAddrs(brokerAddress);
brokerData.setCluster("mockCluster");
brokerAddressTable.put("mockBrokerName", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddressTable);

HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
Set<String> brokerNames = new HashSet<>();
brokerNames.add("mockBrokerName");
clusterAddressTable.put("mockCluster", brokerNames);
clusterInfo.setClusterAddrTable(clusterAddressTable);

// start name server
return ServerResponseMocker.startServer(clusterInfo.encode());
}

private ServerResponseMocker startOneBroker() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connectionSet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
Expand Down Expand Up @@ -159,6 +160,24 @@ public static void init() throws NoSuchFieldException, IllegalAccessException, R
properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis());
consumerRunningInfo.setProperties(properties);
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);

ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
brokerData = new BrokerData();
brokerData.setBrokerName("mockBrokerName");
HashMap<Long, String> brokerAddress = new HashMap<>();
brokerAddress.put(1L, "127.0.0.1:" + 10911);
brokerData.setBrokerAddrs(brokerAddress);
brokerData.setCluster("mockCluster");
brokerAddressTable.put("mockBrokerName", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddressTable);

HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
Set<String> brokerNames = new HashSet<>();
brokerNames.add("mockBrokerName");
clusterAddressTable.put("mockCluster", brokerNames);
clusterInfo.setClusterAddrTable(clusterAddressTable);
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
}

@AfterClass
Expand Down

0 comments on commit 4f21e62

Please sign in to comment.