Skip to content

Commit

Permalink
broker管理接口修改
Browse files Browse the repository at this point in the history
  • Loading branch information
江楠 committed Sep 7, 2020
1 parent dcf7feb commit d752dc5
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class BrokerTopicMonitorRecord {
private long retryCount;
private long retryTps;
private long backlog;
private long traffic;

public String getApp() {
return app;
Expand Down Expand Up @@ -101,4 +102,12 @@ public long getTps() {
public void setTps(long tps) {
this.tps = tps;
}

public long getTraffic() {
return traffic;
}

public void setTraffic(long traffic) {
this.traffic = traffic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class BrokerRestUrlMappingServiceImpl implements BrokerRestUrlMappingServ
private String appConnectionDetailPath = "/monitor/connections/detail";
// /manage/topic/:topic/partitionGroup/:partitionGroup/store/metric
private String partitiongroupIndexPath="/manage/topic/%s/partitionGroup/%s/store/metric";
private String consumerInfosPath = "/monitor/consumers?page=%s&pageSize=%s";
private String producerInfosPath = "/monitor/producers?page=%s&pageSize=%s";

/** offset management*/
private String removeProducersConnectionsPath= "/manage/topic/%s/app/%s/producers";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.joyqueue.service.impl;

import com.alibaba.fastjson.JSONObject;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.manage.PartitionGroupMetric;
import org.joyqueue.model.PageResult;
Expand Down Expand Up @@ -46,6 +47,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -135,35 +137,15 @@ public PageResult<Client> queryClientConnectionDetail(QPageQuery<QMonitor> qPage
*/
@Override
public PageResult<BrokerTopicMonitor> queryTopicsMointor(QPageQuery<QMonitor> qPageQuery) {

PageResult<BrokerTopicMonitor> pageResult = new PageResult<>();
try {
Pagination pagination = qPageQuery.getPagination();
QMonitor qMonitor = qPageQuery.getQuery();
Broker broker = brokerService.findById(Integer.valueOf(String.valueOf(qMonitor.getBrokerId())));
List<String> toplicList = queryTopicList(broker);
pagination.setTotalRecord(toplicList.size());
int fromIndx = pagination.getStart() + pagination.getSize();
if (fromIndx > pagination.getTotalRecord()) {
fromIndx = pagination.getTotalRecord();
}

List<BrokerTopicMonitor> brokerTopicMonitors = new ArrayList<>(pagination.getSize());

for (String topic : toplicList.subList(pagination.getStart(), fromIndx)) {
List<String> appList = getAppByTopic(qMonitor.getType(), topic);

BrokerTopicMonitor brokerTopicMonitor = getMonitorByAppAndTopic(topic, appList, broker, qMonitor.getType());
if (brokerTopicMonitor.getBrokerTopicMonitorRecordList().size() > 0) {
brokerTopicMonitors.add(brokerTopicMonitor);
}
}
pageResult.setPagination(pagination);
pageResult.setResult(brokerTopicMonitors);
return getMonitorByBrokerPage(broker, qMonitor.getType(), pagination.getPage(), pagination.getSize());
} catch (Exception e) {
logger.error("queryTopicsMointor exception", e);
}
return pageResult;
return new PageResult<>();
}

/**
Expand Down Expand Up @@ -222,6 +204,7 @@ private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List<String> ap
brokerTopicMonitorRecord.setConnections(consumerMonitorInfo.getConnections());
brokerTopicMonitorRecord.setCount(consumerMonitorInfo.getDeQueue().getCount());
brokerTopicMonitorRecord.setTotalSize(consumerMonitorInfo.getDeQueue().getTotalSize());
brokerTopicMonitorRecord.setTraffic(consumerMonitorInfo.getDeQueue().getTraffic());
}
} else if (type == SubscribeType.PRODUCER) {
ProducerMonitorInfo producerMonitorInfo = null;
Expand All @@ -234,17 +217,86 @@ private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List<String> ap
brokerTopicMonitorRecord.setConnections(producerMonitorInfo.getConnections());
brokerTopicMonitorRecord.setCount(producerMonitorInfo.getEnQueue().getCount());
brokerTopicMonitorRecord.setTotalSize(producerMonitorInfo.getEnQueue().getTotalSize());
brokerTopicMonitorRecord.setTraffic(producerMonitorInfo.getEnQueue().getTraffic());
brokerTopicMonitorRecord.setTps(producerMonitorInfo.getEnQueue().getTps());
}
}
brokerTopicMonitorRecord.setApp(app);
brokerMonitorRecordList.add(brokerTopicMonitorRecord);
}
brokerMonitorRecordList.sort(Comparator.comparingLong(BrokerTopicMonitorRecord::getBacklog));
brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList);
brokerTopicMonitor.setTopic(topic);
return brokerTopicMonitor;
}

private PageResult<BrokerTopicMonitor> getMonitorByBrokerPage(Broker broker, SubscribeType type, int page, int pageSize) throws Exception {
Pagination pagination = new Pagination();
pagination.setPage(page);
pagination.setSize(pageSize);
PageResult<BrokerTopicMonitor> pageResult = new PageResult<>();
List<BrokerTopicMonitor> brokerTopicMonitors = new ArrayList<>();
if (type == SubscribeType.CONSUMER) {
JSONObject map = queryMonitorConsumers(broker, page, pageSize);
pagination.setTotalRecord(Integer.parseInt(map.getOrDefault("total", 0).toString()));
List<ConsumerMonitorInfo> consumerMonitorInfos = map.getJSONArray("data").toJavaList(ConsumerMonitorInfo.class);
for (ConsumerMonitorInfo consumerMonitorInfo: consumerMonitorInfos) {
BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor();
BrokerTopicMonitorRecord brokerTopicMonitorRecord = new BrokerTopicMonitorRecord();
if (consumerMonitorInfo.getRetry() != null) {
brokerTopicMonitorRecord.setRetryCount(consumerMonitorInfo.getRetry().getCount());
brokerTopicMonitorRecord.setRetryTps(consumerMonitorInfo.getRetry().getCurrent());
}
if (consumerMonitorInfo.getPending() != null) {
brokerTopicMonitorRecord.setBacklog(consumerMonitorInfo.getPending().getCount());
}
brokerTopicMonitorRecord.setConnections(consumerMonitorInfo.getConnections());
brokerTopicMonitorRecord.setCount(consumerMonitorInfo.getDeQueue().getCount());
brokerTopicMonitorRecord.setTotalSize(consumerMonitorInfo.getDeQueue().getTotalSize());
brokerTopicMonitorRecord.setTraffic(consumerMonitorInfo.getDeQueue().getTraffic());
brokerTopicMonitorRecord.setApp(consumerMonitorInfo.getApp());
List<BrokerTopicMonitorRecord> brokerMonitorRecordList = new ArrayList<>();
brokerMonitorRecordList.add(brokerTopicMonitorRecord);
brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList);
brokerTopicMonitor.setTopic(consumerMonitorInfo.getTopic());
brokerTopicMonitors.add(brokerTopicMonitor);
}
brokerTopicMonitors.sort((o1, o2) -> {
BrokerTopicMonitorRecord brokerTopicMonitorRecord1 = o1.getBrokerTopicMonitorRecordList().get(0);
BrokerTopicMonitorRecord brokerTopicMonitorRecord2 = o2.getBrokerTopicMonitorRecordList().get(0);
return Long.compare(brokerTopicMonitorRecord2.getBacklog(), brokerTopicMonitorRecord1.getBacklog());
});
} else if (type == SubscribeType.PRODUCER) {
JSONObject map = queryMonitorProducers(broker, page, pageSize);
pagination.setTotalRecord(Integer.parseInt(map.getOrDefault("total", 0).toString()));
List<ProducerMonitorInfo> producerMonitorInfos = map.getJSONArray("data").toJavaList(ProducerMonitorInfo.class);
for (ProducerMonitorInfo producerMonitorInfo: producerMonitorInfos) {
BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor();
BrokerTopicMonitorRecord brokerTopicMonitorRecord = new BrokerTopicMonitorRecord();
brokerTopicMonitorRecord.setConnections(producerMonitorInfo.getConnections());
brokerTopicMonitorRecord.setCount(producerMonitorInfo.getEnQueue().getCount());
brokerTopicMonitorRecord.setTotalSize(producerMonitorInfo.getEnQueue().getTotalSize());
brokerTopicMonitorRecord.setTraffic(producerMonitorInfo.getEnQueue().getTraffic());
brokerTopicMonitorRecord.setTps(producerMonitorInfo.getEnQueue().getTps());
brokerTopicMonitorRecord.setApp(producerMonitorInfo.getApp());
List<BrokerTopicMonitorRecord> brokerMonitorRecordList = new ArrayList<>();
brokerMonitorRecordList.add(brokerTopicMonitorRecord);
brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList);
brokerTopicMonitor.setTopic(producerMonitorInfo.getTopic());
brokerTopicMonitors.add(brokerTopicMonitor);
}
brokerTopicMonitors.sort((o1, o2) -> {
BrokerTopicMonitorRecord brokerTopicMonitorRecord1 = o1.getBrokerTopicMonitorRecordList().get(0);
BrokerTopicMonitorRecord brokerTopicMonitorRecord2 = o2.getBrokerTopicMonitorRecordList().get(0);
return Long.compare(brokerTopicMonitorRecord2.getCount(), brokerTopicMonitorRecord1.getCount());
});
}
pageResult.setResult(brokerTopicMonitors);
pageResult.setPagination(pagination);
return pageResult;
}


private List<String> getAppByTopic(SubscribeType subscribeType, String topic) throws Exception {
if (subscribeType == SubscribeType.CONSUMER) {
List<Consumer> consumerList;
Expand Down Expand Up @@ -283,6 +335,34 @@ private List<PartitionGroupMetric> getPartitionGroup(String topic, Broker broker
return null;
}

private JSONObject queryMonitorConsumers(Broker broker, int page, int pageSize) {
String path = "consumerInfos";
String[] args = new String[4];
args[0] = broker.getIp();
args[1] = String.valueOf(broker.getMonitorPort());
args[2] = String.valueOf(page);
args[3] = String.valueOf(pageSize);
RestResponse<JSONObject> restResponse = httpRestService.get(path, JSONObject.class, false, args);
if (restResponse != null && restResponse.getData() != null) {
return restResponse.getData();
}
return new JSONObject();
}

private JSONObject queryMonitorProducers(Broker broker, int page, int pageSize) {
String path = "producerInfos";
String[] args = new String[4];
args[0] = broker.getIp();
args[1] = String.valueOf(broker.getMonitorPort());
args[2] = String.valueOf(page);
args[3] = String.valueOf(pageSize);
RestResponse<JSONObject> restResponse = httpRestService.get(path, JSONObject.class, false, args);
if (restResponse != null && restResponse.getData() != null) {
return restResponse.getData();
}
return new JSONObject();
}

/**
* 查询消费者详情
* @return
Expand Down
Loading

0 comments on commit d752dc5

Please sign in to comment.