Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5844] Fix bug when examine pop consumer group info #5956

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
Expand All @@ -90,7 +91,6 @@
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
Expand Down Expand Up @@ -422,35 +422,20 @@ public ConsumeStats examineConsumeStats(
@Override
public ConsumeStats examineConsumeStats(String consumerGroup,
String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
TopicRouteData topicRouteData = null;
List<String> routeTopics = new ArrayList<>();
routeTopics.add(MixAll.getRetryTopic(consumerGroup));
if (topic != null) {
routeTopics.add(topic);
routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup));
}
for (int i = 0; i < routeTopics.size(); i++) {
try {
topicRouteData = this.examineTopicRouteInfo(routeTopics.get(i));
if (topicRouteData != null) {
break;
}
} catch (Throwable e) {
if (i == routeTopics.size() - 1) {
throw e;
}
}
}
ConsumeStats result = new ConsumeStats();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
double value = result.getConsumeTps() + consumeStats.getConsumeTps();
result.setConsumeTps(value);
for (BrokerData brokerData : this.examineBrokerClusterInfo().getBrokerAddrTable().values()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanning all clusters is not a good choice. This way is inefficient and easily leads to timeout, especially since the number of clusters is very large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, old code also scan all clusters, because consumers send heartbeat and create retry topics on all brokers.
Do you have any suggestions for improvement here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, old code also scan all clusters, because consumers send heartbeat and create retry topics on all brokers. Do you have any suggestions for improvement here?

That's not exactly. If we only have one cluster, these two ways do not have differences. But in production practice, there are many broker clusters registered to the same nameserver cluster. So that your modification will access unnecessary clusters.

if (brokerData.getBrokerAddrs() == null) {
continue;
}
String addr = brokerData.selectBrokerAddr();
if (StringUtils.isEmpty(addr) || StringUtils.isBlank(addr)) {
continue;
}
final ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl()
.getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
final double value = result.getConsumeTps() + consumeStats.getConsumeTps();
result.setConsumeTps(value);
}

Set<String> topics = new HashSet<>();
Expand Down Expand Up @@ -617,14 +602,18 @@ public ConsumerConnection examineConsumerConnectionInfo(
String consumerGroup) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
ConsumerConnection result = new ConsumerConnection();
String topic = MixAll.getRetryTopic(consumerGroup);
List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
String addr = null;
if (brokerData != null) {
for (BrokerData brokerData : this.examineBrokerClusterInfo().getBrokerAddrTable().values()) {
if (brokerData.getBrokerAddrs() == null) {
continue;
}
addr = brokerData.selectBrokerAddr();
if (StringUtils.isNotBlank(addr)) {
result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis);
if (StringUtils.isEmpty(addr) || StringUtils.isBlank(addr)) {
continue;
}
result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis);
if (result != null) {
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
package org.apache.rocketmq.tools.command.consumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.server.NameServerMocker;
import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class ConsumerProgressSubCommandTest {
Expand All @@ -41,7 +43,7 @@ public class ConsumerProgressSubCommandTest {
@Before
public void before() {
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
nameServerMocker = startNameServer();
}

@After
Expand All @@ -50,7 +52,6 @@ public void after() {
nameServerMocker.shutdown();
}

@Ignore
@Test
public void testExecute() throws SubCommandException {
ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
Expand All @@ -63,13 +64,36 @@ public void testExecute() throws SubCommandException {
cmd.execute(commandLine, options, null);
}

private ServerResponseMocker startNameServer() {
ClusterInfo clusterInfo = new ClusterInfo();

HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("mockBrokerName");
HashMap<Long, String> brokerAddress = new HashMap<>();
brokerAddress.put(1L, "127.0.0.1:" + brokerMocker.listenPort());
brokerData.setBrokerAddrs(brokerAddress);
brokerData.setCluster("mockCluster");
brokerAddressTable.put("mockBrokerName", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddressTable);

HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
Set<String> brokerNames = new HashSet<>();
brokerNames.add("mockBrokerName");
clusterAddressTable.put("mockCluster", brokerNames);
clusterInfo.setClusterAddrTable(clusterAddressTable);

// start name server
return ServerResponseMocker.startServer(clusterInfo.encode());
}

private ServerResponseMocker startOneBroker() {
ConsumeStats consumeStats = new ConsumeStats();
HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<>();
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName("mockBrokerName");
messageQueue.setQueueId(1);
messageQueue.setBrokerName("mockTopicName");
messageQueue.setTopic("mockTopicName");

OffsetWrapper offsetWrapper = new OffsetWrapper();
offsetWrapper.setBrokerOffset(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
*/
package org.apache.rocketmq.tools.command.consumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.server.NameServerMocker;
import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.After;
import org.junit.Before;
Expand All @@ -41,7 +44,7 @@ public class ConsumerStatusSubCommandTest {
@Before
public void before() {
brokerMocker = startOneBroker();
nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort());
nameServerMocker = startNameServer();
}

@After
Expand All @@ -62,6 +65,29 @@ public void testExecute() throws SubCommandException {
cmd.execute(commandLine, options, null);
}

private ServerResponseMocker startNameServer() {
ClusterInfo clusterInfo = new ClusterInfo();

HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("mockBrokerName");
HashMap<Long, String> brokerAddress = new HashMap<>();
brokerAddress.put(1L, "127.0.0.1:" + brokerMocker.listenPort());
brokerData.setBrokerAddrs(brokerAddress);
brokerData.setCluster("mockCluster");
brokerAddressTable.put("mockBrokerName", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddressTable);

HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
Set<String> brokerNames = new HashSet<>();
brokerNames.add("mockBrokerName");
clusterAddressTable.put("mockCluster", brokerNames);
clusterInfo.setClusterAddrTable(clusterAddressTable);

// start name server
return ServerResponseMocker.startServer(clusterInfo.encode());
}

private ServerResponseMocker startOneBroker() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connectionSet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
Expand Down Expand Up @@ -159,6 +160,24 @@ public static void init() throws NoSuchFieldException, IllegalAccessException, R
properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis());
consumerRunningInfo.setProperties(properties);
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);

ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
brokerData = new BrokerData();
brokerData.setBrokerName("mockBrokerName");
HashMap<Long, String> brokerAddress = new HashMap<>();
brokerAddress.put(1L, "127.0.0.1:" + 10911);
brokerData.setBrokerAddrs(brokerAddress);
brokerData.setCluster("mockCluster");
brokerAddressTable.put("mockBrokerName", brokerData);
clusterInfo.setBrokerAddrTable(brokerAddressTable);

HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
Set<String> brokerNames = new HashSet<>();
brokerNames.add("mockBrokerName");
clusterAddressTable.put("mockCluster", brokerNames);
clusterInfo.setClusterAddrTable(clusterAddressTable);
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
}

@AfterClass
Expand Down