diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 3a023e362cb..e0ff02d7945 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -87,6 +87,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetClusterListRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; @@ -1163,10 +1164,18 @@ public Properties getBrokerConfig(final String addr, final long timeoutMillis) throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ClusterInfo getBrokerClusterInfo( + public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + return this.getBrokerClusterInfo("", timeoutMillis); + } + + public ClusterInfo getBrokerClusterInfo(String cluster, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); + GetClusterListRequestHeader requestHeader = new GetClusterListRequestHeader(); + String pCluster = cluster != null ? cluster : ""; + requestHeader.setCluster(pCluster); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); assert response != null; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetClusterListRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetClusterListRequestHeader.java new file mode 100644 index 00000000000..72b72b2f544 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetClusterListRequestHeader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNullable; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +public class GetClusterListRequestHeader implements CommandCustomHeader { + + @CFNullable + private String cluster; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } +} diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 467078c44f8..92d8be060b4 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -20,6 +20,7 @@ import java.io.UnsupportedEncodingException; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; @@ -35,6 +36,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.header.GetClusterListRequestHeader; import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteTopicInNamesrvRequestHeader; @@ -363,11 +365,17 @@ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, return response; } - private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) { + private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); + GetClusterListRequestHeader requestHeader = (GetClusterListRequestHeader) request.decodeCommandCustomHeader(GetClusterListRequestHeader.class); + if (StringUtils.isBlank(requestHeader.getCluster())) { + byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo(); + response.setBody(content); + } else { + byte[] content = this.namesrvController.getRouteInfoManager().getOneClusterInfo(requestHeader.getCluster()); + response.setBody(content); + } - byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo(); - response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 00962ef246e..1fef54706fe 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -70,6 +71,27 @@ public byte[] getAllClusterInfo() { return clusterInfoSerializeWrapper.encode(); } + public byte[] getOneClusterInfo(String cluster) { + HashMap> clusterAddr = new HashMap<>(); + HashMap brokerAddr = new HashMap<>(); + + Set brokers = clusterAddrTable.get(cluster); + if (brokers != null) { + for (String broker : brokers) { + BrokerData brokerData = brokerAddrTable.get(broker); + if (brokerData != null) { + brokerAddr.put(broker, brokerData); + } + } + clusterAddr.put(cluster, brokers); + } + + ClusterInfo info = new ClusterInfo(); + info.setBrokerAddrTable(brokerAddr); + info.setClusterAddrTable(clusterAddr); + return info.encode(); + } + public void deleteTopic(final String topic) { try { try { diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java index d4a2f66f99f..b6285e450e1 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java @@ -30,7 +30,9 @@ import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.header.GetClusterListRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; @@ -76,7 +78,9 @@ public void init() throws Exception { field.set(namesrvController, routeInfoManager); defaultRequestProcessor = new DefaultRequestProcessor(namesrvController); - registerRouteInfoManager(); + for (int i = 10; i < 15; i++) { + registerRouteInfoManager("a" + i, i); + } logger = mock(InternalLogger.class); setFinalStatic(DefaultRequestProcessor.class.getDeclaredField("log"), logger); @@ -256,20 +260,45 @@ private static void setFinalStatic(Field field, Object newValue) throws Exceptio field.set(null, newValue); } - private void registerRouteInfoManager() { + private void registerRouteInfoManager(String prefix, int id) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); ConcurrentHashMap topicConfigConcurrentHashMap = new ConcurrentHashMap<>(); TopicConfig topicConfig = new TopicConfig(); topicConfig.setWriteQueueNums(8); - topicConfig.setTopicName("unit-test"); + topicConfig.setTopicName("unit-test_" + prefix); topicConfig.setPerm(6); topicConfig.setReadQueueNums(8); topicConfig.setOrder(false); topicConfigConcurrentHashMap.put("unit-test", topicConfig); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap); + Channel channel = mock(Channel.class); - RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001", + RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster_" + prefix, "127.0.0.1:120" + id, "default-broker_" + prefix, 1236 + id, "127.0.0.1:100" + id, topicConfigSerializeWrapper, new ArrayList(), channel); } + + private static RemotingCommand genClusterListCmd(String cluster) { + GetClusterListRequestHeader header = new GetClusterListRequestHeader(); + header.setCluster(cluster); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, header); + return request; + } + + @Test + public void testClusterInfo() throws RemotingCommandException { + testClusterInfo(null, 5); + testClusterInfo("default-cluster_a10", 1); + } + + public void testClusterInfo(String cluster, int want) throws RemotingCommandException { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(null); + RemotingCommand response = defaultRequestProcessor.processRequest(ctx, genClusterListCmd(cluster)); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + assertThat(response.getRemark()).isNull(); + ClusterInfo info = ClusterInfo.decode(response.getBody(), ClusterInfo.class); + assertThat(info.getClusterAddrTable().size() == want); + } } \ No newline at end of file diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java index bd151d0561e..0f37d7ab100 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java @@ -107,7 +107,7 @@ public static ClusterInfo getCluster(String nameSrvAddr) { ClusterInfo clusterInfo = null; try { mqAdminExt.start(); - clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + clusterInfo = mqAdminExt.examineBrokerClusterInfo(null); } catch (Exception e) { e.printStackTrace(); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index dc829c1c1b5..73e2c300e4b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -217,8 +217,13 @@ public ConsumeStats examineConsumeStats(String consumerGroup, } @Override - public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + public ClusterInfo examineBrokerClusterInfo(String cluster) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { + return defaultMQAdminExtImpl.examineBrokerClusterInfo(cluster); + } + + @Override + public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { return defaultMQAdminExtImpl.examineBrokerClusterInfo(); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index bcd66669ce2..4a115482147 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -270,8 +270,13 @@ public ConsumeStats examineConsumeStats(String consumerGroup, } @Override - public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, + public ClusterInfo examineBrokerClusterInfo(String cluster) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(cluster,timeoutMillis); + } + + @Override + public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis); } @@ -626,7 +631,7 @@ public boolean cleanExpiredConsumerQueue( RemotingTimeoutException, MQClientException, InterruptedException { boolean result = false; try { - ClusterInfo clusterInfo = examineBrokerClusterInfo(); + ClusterInfo clusterInfo = examineBrokerClusterInfo(cluster); if (null == cluster || "".equals(cluster)) { for (String targetCluster : clusterInfo.retrieveAllClusterNames()) { result = cleanExpiredConsumerQueueByCluster(clusterInfo, targetCluster); @@ -666,7 +671,7 @@ public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingTimeoutException, MQClientException, InterruptedException { boolean result = false; try { - ClusterInfo clusterInfo = examineBrokerClusterInfo(); + ClusterInfo clusterInfo = examineBrokerClusterInfo(cluster); if (null == cluster || "".equals(cluster)) { for (String targetCluster : clusterInfo.retrieveAllClusterNames()) { result = cleanUnusedTopicByCluster(clusterInfo, targetCluster); @@ -832,7 +837,7 @@ public boolean consumed(final MessageExt msg, ConsumeStats cstats = this.examineConsumeStats(group); - ClusterInfo ci = this.examineBrokerClusterInfo(); + ClusterInfo ci = this.examineBrokerClusterInfo(null); Iterator> it = cstats.getOffsetTable().entrySet().iterator(); while (it.hasNext()) { @@ -894,7 +899,7 @@ public Set getTopicClusterList( final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { Set clusterSet = new HashSet(); - ClusterInfo clusterInfo = examineBrokerClusterInfo(); + ClusterInfo clusterInfo = examineBrokerClusterInfo(null); TopicRouteData topicRouteData = examineTopicRouteInfo(topic); BrokerData brokerData = topicRouteData.getBrokerDatas().get(0); String brokerName = brokerData.getBrokerName(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 16b4427575f..35089f90244 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -97,6 +97,9 @@ ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + ClusterInfo examineBrokerClusterInfo(String cluster) throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException; + ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java index 2e65f980848..04fb86b873f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java @@ -45,7 +45,7 @@ public class CommandUtil { MQBrokerException { Map> masterAndSlaveMap = new HashMap>(4); - ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(clusterName); Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); if (brokerNameSet == null) { @@ -80,7 +80,7 @@ public static Set fetchMasterAddrByClusterName(final MQAdminExt adminExt RemotingSendRequestException, MQBrokerException { Set masterSet = new HashSet(); - ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(clusterName); Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); @@ -125,7 +125,7 @@ public static Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt public static Set fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName) throws Exception { - ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(clusterName); Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); if (brokerNameSet.isEmpty()) { throw new Exception(ERROR_MESSAGE); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java index ecbe1f38a54..9bb877259ac 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java @@ -92,7 +92,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t defaultMQAdminExt.start(); producer.start(); - ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(null); HashMap> clusterAddr = clusterInfoSerializeWrapper .getClusterAddrTable(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index 6a0cd71c177..7b6a7afa4b7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -57,6 +57,10 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("c", "cluster", true, "specify cluster to get only this cluster"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -74,6 +78,12 @@ public void execute(final CommandLine commandLine, final Options options, printInterval = Long.parseLong(commandLine.getOptionValue('i')) * 1000; } + String cluster = null; + boolean hasCluster = commandLine.hasOption('c'); + if (hasCluster) { + cluster = commandLine.getOptionValue('c'); + } + try { defaultMQAdminExt.start(); long i = 0; @@ -83,9 +93,9 @@ public void execute(final CommandLine commandLine, final Options options, Thread.sleep(printInterval); } if (commandLine.hasOption('m')) { - this.printClusterMoreStats(defaultMQAdminExt); + this.printClusterMoreStats(cluster, defaultMQAdminExt); } else { - this.printClusterBaseInfo(defaultMQAdminExt); + this.printClusterBaseInfo(cluster, defaultMQAdminExt); } } while (enableInterval); @@ -96,10 +106,10 @@ public void execute(final CommandLine commandLine, final Options options, } } - private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, + private void printClusterMoreStats(final String cluster,final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { - ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(cluster); System.out.printf("%-16s %-32s %14s %14s %14s %14s%n", "#Cluster Name", @@ -165,11 +175,11 @@ private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) th } } - private void printClusterBaseInfo( + private void printClusterBaseInfo(final String cluster, final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { - ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(cluster); System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", "#Cluster Name", diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java index 56aea2e8084..79cf7328709 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java @@ -65,7 +65,7 @@ public void execute(final CommandLine commandLine, final Options options, try { defaultMQAdminExt.start(); if (commandLine.hasOption('c')) { - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(null); System.out.printf("%-20s %-48s %-48s%n", "#Cluster Name", diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 78659800219..9ed7357bc53 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -144,7 +144,7 @@ public static void init() throws Exception { brokerAddrTable.put("broker-test", new BrokerData()); clusterInfo.setBrokerAddrTable(brokerAddrTable); clusterInfo.setClusterAddrTable(new HashMap>()); - when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); + when(mQClientAPIImpl.getBrokerClusterInfo(anyString(), anyLong())).thenReturn(clusterInfo); when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); Set clusterList = new HashSet<>(); @@ -257,7 +257,7 @@ public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerE @Test public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(null); HashMap brokerList = clusterInfo.getBrokerAddrTable(); assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker"); assertThat(brokerList.containsKey("broker-test")).isTrue(); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java index 33b449768d1..072923f1003 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java @@ -81,7 +81,7 @@ public void setup() throws MQClientException, NoSuchFieldException, IllegalAcces clusterAddrTable.put("default-cluster", brokerSet); clusterInfo.setBrokerAddrTable(brokerAddrTable); clusterInfo.setClusterAddrTable(clusterAddrTable); - when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo); + when(mQClientAPIImpl.getBrokerClusterInfo(anyString(), anyLong())).thenReturn(clusterInfo); when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true); }