Skip to content

Commit

Permalink
[INLONG-10074][Manager] Fix the problem of Error in querying audit in…
Browse files Browse the repository at this point in the history
…formation based on IP address (#10075)

(cherry picked from commit 721d61a)
  • Loading branch information
fuweng11 authored and baomingyu committed Apr 25, 2024
1 parent e8c81b4 commit 8378617
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ List<Map<String, Object>> sumByLogTsAndIp(@Param(value = "ip") String ip,

List<Map<String, Object>> sumGroupByIp(@Param(value = "groupId") String groupId,
@Param(value = "streamId") String streamId,
@Param(value = "ip") String ip,
@Param(value = "auditId") String auditId,
@Param(value = "startDate") String startDate,
@Param(value = "endData") String endData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Field;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Sum;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo.QueryBool;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo.SortValue;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchRequest;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.audit.AuditRunnable;
Expand All @@ -61,11 +53,8 @@
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
Expand Down Expand Up @@ -369,9 +358,8 @@ public List<AuditVO> listAll(AuditRequest request) throws Exception {
// Support min agg at now
DateTime endDate = SECOND_DATE_FORMATTER.parseDateTime(request.getEndDate());
String endDateStr = endDate.plusDays(1).toString(SECOND_DATE_FORMATTER);
List<Map<String, Object>> sumList = auditEntityMapper.sumGroupByIp(
request.getInlongGroupId(), request.getInlongStreamId(), auditId, request.getStartDate(),
endDateStr);
List<Map<String, Object>> sumList = auditEntityMapper.sumGroupByIp(request.getInlongGroupId(),
request.getInlongStreamId(), request.getIp(), auditId, request.getStartDate(), endDateStr);
List<AuditInfo> auditSet = sumList.stream().map(s -> {
AuditInfo vo = new AuditInfo();
vo.setInlongGroupId((String) s.get("inlongGroupId"));
Expand Down Expand Up @@ -433,64 +421,6 @@ private List<String> getAuditIds(String groupId, String streamId, String sourceN
return new ArrayList<>(auditSet);
}

/**
* Convert to elasticsearch search request json
*
* @param groupId The groupId of inlong
* @param streamId The streamId of inlong
* @return The search request of elasticsearch json
*/
public static JsonObject toAuditSearchRequestJson(String groupId, String streamId) {
Map<String, ElasticsearchQueryInfo.TermValue> groupIdMap = Maps.newHashMap();
groupIdMap.put(INLONG_GROUP_ID, new ElasticsearchQueryInfo.TermValue(groupId, DEFAULT_BOOST));
ElasticsearchQueryInfo.QueryTerm groupIdTerm = ElasticsearchQueryInfo.QueryTerm.builder().term(groupIdMap)
.build();
Map<String, ElasticsearchQueryInfo.TermValue> streamIdMap = Maps.newHashMap();
streamIdMap.put(INLONG_STREAM_ID, new ElasticsearchQueryInfo.TermValue(streamId, DEFAULT_BOOST));
ElasticsearchQueryInfo.QueryTerm streamIdTerm = ElasticsearchQueryInfo.QueryTerm.builder().term(streamIdMap)
.build();
QueryBool boolInfo = QueryBool.builder()
.must(Lists.newArrayList(groupIdTerm, streamIdTerm))
.boost(DEFAULT_BOOST)
.adjustPureNegative(ADJUST_PURE_NEGATIVE)
.build();
ElasticsearchQueryInfo queryInfo = ElasticsearchQueryInfo.builder().bool(boolInfo).build();

Map<String, SortValue> termValueInfoMap = Maps.newHashMap();
termValueInfoMap.put(TERM_FILED, new SortValue(SORT_ORDER));
List<Map<String, SortValue>> list = Lists.newArrayList(termValueInfoMap);
ElasticsearchQuerySortInfo sortInfo = ElasticsearchQuerySortInfo.builder().sort(list).build();

Sum countSum = Sum.builder()
.sum(new Field(COUNT))
.build();
Sum delaySum = Sum.builder()
.sum(new Field(DELAY))
.build();
Map<String, Sum> aggregations = Maps.newHashMap();
aggregations.put(COUNT, countSum);
aggregations.put(DELAY, delaySum);
ElasticsearchAggregationsTermsInfo termsInfo = ElasticsearchAggregationsTermsInfo.builder()
.field(TERM_FILED)
.size(Integer.MAX_VALUE)
.aggregations(aggregations)
.build();
Map<String, ElasticsearchAggregationsTermsInfo> terms = Maps.newHashMap();
terms.put(TERMS, termsInfo);
Map<String, Map<String, ElasticsearchAggregationsTermsInfo>> logTs = Maps.newHashMap();
logTs.put(TERM_FILED, terms);

ElasticsearchRequest request = ElasticsearchRequest.builder()
.from(QUERY_FROM)
.size(QUERY_SIZE)
.query(queryInfo)
.sort(sortInfo)
.aggregations(logTs)
.build();

return GSON.toJsonTree(request).getAsJsonObject();
}

/**
* Get clickhouse Statement
*
Expand Down

0 comments on commit 8378617

Please sign in to comment.