Skip to content

Commit

Permalink
[INLONG-5694][Manager] Fix the problem that gets the inlong group err…
Browse files Browse the repository at this point in the history
…or (#5695)
  • Loading branch information
fuweng11 committed Aug 26, 2022
1 parent defa542 commit a133995
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -88,8 +88,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
log.info("inlong group ext info: {}", extList);

Map<String, String> kvConf = extList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -89,8 +89,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -89,8 +89,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
log.info("inlong group ext info: {}", extList);

Map<String, String> kvConf = extList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -90,8 +90,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -97,8 +97,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
return ListenerResult.success();
}

Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -88,8 +88,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
log.info("inlong group ext info: {}", extList);

Map<String, String> kvConf = extList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;

Expand Down Expand Up @@ -89,8 +89,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.common.CountInfo;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
Expand All @@ -62,6 +61,7 @@

import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -98,9 +98,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {

@Override
public ConsumptionSummary getSummary(ConsumptionQuery query) {
Map<String, Integer> countMap = consumptionMapper.countByQuery(query)
.stream()
.collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
Map<String, Integer> countMap = new HashMap<>();
consumptionMapper.countByQuery(query)
.forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));

return ConsumptionSummary.builder()
.totalCount(countMap.values().stream().mapToInt(c -> c).sum())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -235,11 +236,11 @@ private void reloadAllSourceConfig() {

task2Group.forEach((task, groupList) -> {
// get topic properties under this cluster and task, group them by group id.
Map<String, Map<String, String>> group2topicProp = allStreamInfos.stream()
.filter(stream -> stream.getSortTaskName().equals(task)
&& stream.getSortClusterName().equals(clusterName))
.collect(Collectors.toMap(SortSourceStreamInfo::getGroupId,
SortSourceStreamInfo::getExtParamsMap));
Map<String, Map<String, String>> group2topicProp = new HashMap<>();
allStreamInfos.stream().filter(stream -> stream.getSortTaskName().equals(task)
&& stream.getSortClusterName().equals(clusterName)).forEach(
sortSourceStreamInfo -> group2topicProp.put(sortSourceStreamInfo.getGroupId(),
sortSourceStreamInfo.getExtParamsMap()));

Map<String, CacheZone> cacheZones;
try {
Expand Down Expand Up @@ -317,9 +318,9 @@ private List<CacheZone> getCacheZoneListByTag(
List<String> tags = new ArrayList<>(tag2GroupInfos.keySet());

// Clusters that related to these tags
Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = allTag2ClusterInfos.entrySet().stream()
.filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = new HashMap<>();
allTag2ClusterInfos.entrySet().stream().filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
.forEach(entry -> tag2ClusterInfos.put(entry.getKey(), entry.getValue()));

// get CacheZone list
return tags.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -422,8 +423,8 @@ public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> exts) {
}

private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
Map<String, String> extMap = extInfos.stream()
.collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
Map<String, String> extMap = new HashMap<>();
extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
String type = extMap.get(InlongConstants.SORT_TYPE);
if (StringUtils.isBlank(type)) {
return null;
Expand Down
Loading

0 comments on commit a133995

Please sign in to comment.