-
Notifications
You must be signed in to change notification settings - Fork 566
Description
Description
Bug Fix
- Incorrect logic : It used isNotBlank which caused the program to error out when the input was actually valid.
- The fix : Changed isNotBlank to isBlank.
// Before
if (StringUtils.isNotBlank(input)) {
System.err.println("input cannot be empty, for example: inlongGroupId:test_group");
return;
}
// After
if (StringUtils.isBlank(input)) {
System.err.println("input cannot be empty, for example: inlongGroupId:test_group");
return;
}Make a condition filter to support OR and AND
I noticed a TODO in "LogCommand.java": "support OR and AND, make a condition filter." Could I try to implement this? I have attached my code, unit tests, and test results. My understanding of the requirements might not be fully accurate, so if you're willing to let me take this on, please point out any issues or let me know what improvements I should make.
Specification
Currently, I'm using "," to represent AND logic and "|" for OR logic, but it does not yet support the simultaneous use of AND and OR logic.
| Supported Cases | Description |
|---|---|
| groupId:g1,status:2 | AND logic for queries across different fields. |
| groupId:g1|groupId:g2 | OR logic for multiple values within the same field. |
| status:1|mqType:pulsar | OR logic for queries across different fields. |
| groupId:g1,groupId:g2 | Invalid inputs, can be correctly identified. |
| groupId:g1|status:1,keyword:k | Invalid inputs, can be correctly identified. |
| unknownKey:v1 | Invalid inputs, can be correctly identified. |
The modified LogCommand.java code
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.inlong.manager.client.cli;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
import org.apache.inlong.manager.client.cli.util.ClientUtils;
import org.apache.inlong.manager.client.cli.util.PrintUtils;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The log command was used to get log info for specified inlong groups. */@Parameters(commandDescription = "Log resource")
public class LogCommand extends AbstractCommand {
@Parameter()
private List<String> params;
public LogCommand() {
super("log");
jcommander.addCommand("group", new CreateGroup());
}
@Parameters(commandDescription = "Log group")
private static class CreateGroup extends AbstractCommandRunner {
@Parameter()
private List<String> params;
@Parameter(names = {"--query"}, required = true, description = "condition filters")
private String input;
@Override
void run() {
final int MAX_LOG_SIZE = 100;
try {
// for now only filter by one condition. TODO:support OR and AND, make a condition filter.
// sample input: inlongGroupId:test_group
// isNotBlank -> isBlank if (StringUtils.isBlank(input)) {
System.err.println("input cannot be empty, for example: inlongGroupId:test_group");
return;
}
ClientUtils.initClientFactory();
InlongGroupClient groupClient = ClientUtils.clientFactory.getGroupClient();
InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
if (!buildQueryRequest(input, pageRequest, groupClient, MAX_LOG_SIZE)) {
// Case k1:v1 | k2:v2 这种情况是要构造多个pageRequest进行多次查询并返回的
// 目前下面这个只有一次就实现不了,然后目前是处理这种逻辑完直接在那里PrintUtils.print
// 然后返回false,跳过下面的这个查询
return;
}
// 只有and的时候buildQueryRequest就只是封装好pageRequest,然后到这里再查并打印
PageResult<InlongGroupBriefInfo> pageResult = groupClient.listGroups(pageRequest);
if (pageResult.getPageSize() > MAX_LOG_SIZE) {
System.err.println("the log is too large to print, please change the filter condition");
return;
}
PrintUtils.print(pageResult.getList(), GroupInfo.class);
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean buildQueryRequest(String input, InlongGroupPageRequest pageRequest,
InlongGroupClient groupClient, int maxLogSize) {
String trimmed = StringUtils.trim(input);
if (StringUtils.isBlank(trimmed)) {
System.err.println("input cannot be empty, for example: inlongGroupId:test_group");
return false;
}
boolean hasOr = trimmed.contains("|");
boolean hasAnd = trimmed.contains(",");
// 目前只能支持只有or和只有and的情况
if (hasOr && hasAnd) {
System.err.println("do not mix '|' and ',' in one query");
return false;
}
List<String> groupIds = new ArrayList<>();
List<Integer> statusList = new ArrayList<>();
List<String> clusterTags = new ArrayList<>();
if (hasOr) {
return handleOrQuery(trimmed, groupClient, maxLogSize);
} else {
// and是比较好处理的,把参数都提取出来放pageRequest
String[] parts = trimmed.split(",");
// 用来防止and条件中有 groupId:1,groupId:2这样的情况,也就是同key,如果后续也都拼列表里就变成or了
Set<String> seenKeys = new HashSet<>();
for (String part : parts) {
String[] kv = splitKeyValue(part);
if (kv == null) {
return false;
}
String keyLower = StringUtils.lowerCase(kv[0]);
if (seenKeys.contains(keyLower)) {
System.err.println("AND query cannot repeat the same key: " + kv[0]);
return false;
}
seenKeys.add(keyLower);
if (!applyCondition(pageRequest, groupIds, statusList, clusterTags, kv[0], kv[1])) {
return false;
}
}
}
if (!groupIds.isEmpty()) {
pageRequest.setGroupIdList(groupIds);
}
if (!statusList.isEmpty()) {
pageRequest.setStatusList(statusList);
}
if (!clusterTags.isEmpty()) {
pageRequest.setClusterTagList(clusterTags);
}
return true;
}
private boolean handleOrQuery(String input, InlongGroupClient groupClient, int maxLogSize) {
String[] parts = input.split("\\|");
Map<String, List<String>> orGroups = new HashMap<>();
for (String part : parts) {
String[] kv = splitKeyValue(part);
if (kv == null) {
return false;
}
String keyLower = StringUtils.lowerCase(kv[0]);
orGroups.computeIfAbsent(keyLower, k -> new ArrayList<>()).add(kv[1]);
}
List<InlongGroupBriefInfo> merged = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : orGroups.entrySet()) {
String key = entry.getKey();
List<String> values = entry.getValue();
if (isListKey(key)) {
InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
List<String> groupIds = new ArrayList<>();
List<Integer> statusList = new ArrayList<>();
List<String> clusterTags = new ArrayList<>();
for (String value : values) {
if (!applyCondition(pageRequest, groupIds, statusList, clusterTags, key, value)) {
return false;
}
}
if (!groupIds.isEmpty()) {
pageRequest.setGroupIdList(groupIds);
}
if (!statusList.isEmpty()) {
pageRequest.setStatusList(statusList);
}
if (!clusterTags.isEmpty()) {
pageRequest.setClusterTagList(clusterTags);
}
if (!queryAndMerge(groupClient, pageRequest, merged, maxLogSize)) {
return false;
}
} else {
for (String value : values) {
InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
List<String> groupIds = new ArrayList<>();
List<Integer> statusList = new ArrayList<>();
List<String> clusterTags = new ArrayList<>();
if (!applyCondition(pageRequest, groupIds, statusList, clusterTags, key, value)) {
return false;
}
if (!groupIds.isEmpty()) {
pageRequest.setGroupIdList(groupIds);
}
if (!statusList.isEmpty()) {
pageRequest.setStatusList(statusList);
}
if (!clusterTags.isEmpty()) {
pageRequest.setClusterTagList(clusterTags);
}
if (!queryAndMerge(groupClient, pageRequest, merged, maxLogSize)) {
return false;
}
}
}
}
PrintUtils.print(merged, GroupInfo.class);
return false;
}
private boolean isListKey(String keyLower) {
return "inlonggroupid".equals(keyLower)
|| "groupid".equals(keyLower)
|| "status".equals(keyLower)
|| "clustertag".equals(keyLower);
}
private boolean queryAndMerge(InlongGroupClient groupClient,
InlongGroupPageRequest pageRequest,
List<InlongGroupBriefInfo> merged,
int maxLogSize) {
PageResult<InlongGroupBriefInfo> pageResult = groupClient.listGroups(pageRequest);
if (pageResult.getPageSize() > maxLogSize) {
System.err.println("the log is too large to print, please change the filter condition");
return false;
}
mergeResults(merged, pageResult.getList());
return true;
}
private void mergeResults(List<InlongGroupBriefInfo> merged, List<InlongGroupBriefInfo> incoming) {
for (InlongGroupBriefInfo info : incoming) {
boolean exists = false;
for (InlongGroupBriefInfo existing : merged) {
if (StringUtils.equals(existing.getInlongGroupId(), info.getInlongGroupId())) {
exists = true;
break;
}
}
if (!exists) {
merged.add(info);
}
}
}
private String[] splitKeyValue(String part) {
String trimmed = StringUtils.trim(part);
String[] kv = StringUtils.split(trimmed, ":", 2);
if (kv == null || kv.length < 2 || StringUtils.isBlank(kv[1])) {
System.err.println("the input must contain ':', for example: inlongGroupId:test_group");
return null;
}
return new String[] {StringUtils.trim(kv[0]), StringUtils.trim(kv[1])};
}
// 把key放到pageRequest正确的位置
private boolean applyCondition(InlongGroupPageRequest pageRequest,
List<String> groupIds,
List<Integer> statusList,
List<String> clusterTags,
String key,
String value) {
// 都转为小写再匹配,这样大小输错了也可以兜底一下。
String lowerKey = StringUtils.lowerCase(key);
switch (lowerKey) {
case "inlonggroupid":
case "groupid":
groupIds.add(value);
return true;
case "status":
try {
statusList.add(Integer.parseInt(value));
} catch (NumberFormatException ex) {
System.err.println("status must be an integer");
return false;
}
return true;
case "clustertag":
clusterTags.add(value);
return true;
case "mqtype":
if (StringUtils.isNotBlank(pageRequest.getMqType())
&& !StringUtils.equals(pageRequest.getMqType(), value)) {
System.err.println("mqType can only be set once");
return false;
}
pageRequest.setMqType(value);
return true;
case "inlonggroupname":
case "keyword":
if (StringUtils.isNotBlank(pageRequest.getKeyword())
&& !StringUtils.equals(pageRequest.getKeyword(), value)) {
System.err.println("keyword can only be set once");
return false;
}
pageRequest.setKeyword(value);
return true;
case "inlonggroupmode":
try {
pageRequest.setInlongGroupMode(Integer.parseInt(value));
} catch (NumberFormatException ex) {
System.err.println("inlongGroupMode must be an integer");
return false;
}
return true;
default:
System.err.println("unknown filter key: " + key);
return false;
}
}
}
}Test Result
Input : groupId:g1,status:2
The generated PageRequest :
{
"pageNum":1,
"pageSize":10,
"orderField":"create_time",
"orderType":"desc",
"keyword":null,
"groupIdList":["g1"],
"mqType":null,
"status":null,
"statusList":[2],
"clusterTagList":null,
"inlongGroupMode":null,
"currentUser":null,
"tenant":null,
"isAdminRole":null,
"listSources":false
}Input : groupId:g1|groupId:g2
The generated PageRequest :
{
"pageNum":1,
"pageSize":10,
"orderField":"create_time",
"orderType":"desc",
"keyword":null,
"groupIdList":["g1","g2"],
"mqType":null,
"status":null,
"statusList":null,
"clusterTagList":null,
"inlongGroupMode":null,
"currentUser":null,
"tenant":null,
"isAdminRole":null,
"listSources":false
}Input : status:1|mqType:pulsar
(In this case, we need to perform two separate queries and then merge the results.)
The generated PageRequest :
{
"pageNum":1,
"pageSize":10,
"orderField":"create_time",
"orderType":"desc",
"keyword":null,
"groupIdList":null,
"mqType":"pulsar",
"status":null,
"statusList":null,
"clusterTagList":null,
"inlongGroupMode":null,
"currentUser":null,
"tenant":null,
"isAdminRole":null,
"listSources":false
}
{
"pageNum":1,
"pageSize":10,
"orderField":"create_time",
"orderType":"desc",
"keyword":null,
"groupIdList":null,
"mqType":null,
"status":null,
"statusList":[1],
"clusterTagList":null,
"inlongGroupMode":null,
"currentUser":null,
"tenant":null,
"isAdminRole":null,
"listSources":false
}Input : groupId:g1,groupId:g2
Error Log : AND query cannot repeat the same key: groupId
Input : groupId:g1|status:1,keyword:k
Error Log : do not mix '|' and ',' in one query
Input : unknownKey:v1
Error Log : Unknown key should be rejected
Test Method
* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.inlong.manager.client.cli;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
class LogCommandTest {
@Test
void testAndQueryBuildsSinglePageRequest() throws Exception {
Object runner = newRunner();
InlongGroupPageRequest request = new InlongGroupPageRequest();
RecordingGroupClient client = new RecordingGroupClient();
boolean ok = invokeBuildQueryRequest(runner, "groupId:g1,status:2", request, client, 100);
System.out.println("AND pageRequest: " + toJson(request));
Assertions.assertTrue(ok);
Assertions.assertEquals(Collections.singletonList("g1"), request.getGroupIdList());
Assertions.assertEquals(Collections.singletonList(2), request.getStatusList());
Assertions.assertTrue(client.getRequests().isEmpty(), "AND path should not call listGroups here");
}
@Test
void testOrSameKeyMergedIntoSingleRequest() throws Exception {
Object runner = newRunner();
InlongGroupPageRequest request = new InlongGroupPageRequest();
RecordingGroupClient client = new RecordingGroupClient();
boolean ok = invokeBuildQueryRequest(runner, "groupId:g1|groupId:g2", request, client, 100);
printCaptured("OR same-key", client.getRequests());
Assertions.assertFalse(ok, "OR path handles query internally and returns false to stop outer query");
Assertions.assertEquals(1, client.getRequests().size());
InlongGroupPageRequest captured = client.getRequests().get(0);
Assertions.assertEquals(Arrays.asList("g1", "g2"), captured.getGroupIdList());
}
@Test
void testOrDifferentKeysSplitIntoMultipleRequests() throws Exception {
Object runner = newRunner();
InlongGroupPageRequest request = new InlongGroupPageRequest();
RecordingGroupClient client = new RecordingGroupClient();
boolean ok = invokeBuildQueryRequest(runner, "status:1|mqType:pulsar", request, client, 100);
printCaptured("OR cross-key", client.getRequests());
Assertions.assertFalse(ok, "OR path handles query internally and returns false to stop outer query");
Assertions.assertEquals(2, client.getRequests().size());
InlongGroupPageRequest r1 = client.getRequests().get(0);
InlongGroupPageRequest r2 = client.getRequests().get(1);
Assertions.assertTrue(
(r1.getStatusList() != null && r1.getStatusList().contains(1))
|| (r2.getStatusList() != null && r2.getStatusList().contains(1)));
Assertions.assertTrue(
"pulsar".equals(r1.getMqType()) || "pulsar".equals(r2.getMqType()));
}
@Test
void testAndDuplicateKeyRejected() throws Exception {
Object runner = newRunner();
InlongGroupPageRequest request = new InlongGroupPageRequest();
RecordingGroupClient client = new RecordingGroupClient();
boolean ok = invokeBuildQueryRequest(runner, "groupId:g1,groupId:g2", request, client, 100);
Assertions.assertFalse(ok);
Assertions.assertTrue(client.getRequests().isEmpty(), "AND duplicate key should stop before querying");
}
@Test
void testMixedAndOrRejected() throws Exception {
Object runner = newRunner();
InlongGroupPageRequest request = new InlongGroupPageRequest();
RecordingGroupClient client = new RecordingGroupClient();
boolean ok = invokeBuildQueryRequest(runner, "groupId:g1|status:1,keyword:k", request, client, 100);
Assertions.assertFalse(ok);
Assertions.assertTrue(client.getRequests().isEmpty(), "Mixed AND/OR should be rejected");
}
@Test
void testUnknownKeyRejected() throws Exception {
Object runner = newRunner();
InlongGroupPageRequest request = new InlongGroupPageRequest();
RecordingGroupClient client = new RecordingGroupClient();
boolean ok = invokeBuildQueryRequest(runner, "unknownKey:v1", request, client, 100);
Assertions.assertFalse(ok);
Assertions.assertTrue(client.getRequests().isEmpty(), "Unknown key should be rejected");
}
private Object newRunner() throws Exception {
Class<?> runnerClass = Class.forName("org.apache.inlong.manager.client.cli.LogCommand$CreateGroup");
java.lang.reflect.Constructor<?> ctor = runnerClass.getDeclaredConstructor();
ctor.setAccessible(true);
return ctor.newInstance();
}
private boolean invokeBuildQueryRequest(Object runner,
String input,
InlongGroupPageRequest pageRequest,
InlongGroupClient client,
int maxLogSize) throws Exception {
Method method = runner.getClass().getDeclaredMethod("buildQueryRequest",
String.class, InlongGroupPageRequest.class, InlongGroupClient.class, int.class);
method.setAccessible(true);
return (boolean) method.invoke(runner, input, pageRequest, client, maxLogSize);
}
private void printCaptured(String label, List<InlongGroupPageRequest> requests) {
System.out.println(label + " captured requests:");
for (InlongGroupPageRequest r : requests) {
System.out.println(" " + toJson(r));
}
}
private String toJson(InlongGroupPageRequest r) {
return JsonUtils.toJsonString(r);
}
private static class RecordingGroupClient extends InlongGroupClient {
private final List<InlongGroupPageRequest> requests = new ArrayList<>();
RecordingGroupClient() {
super(new ClientConfiguration(new DefaultAuthentication("u", "p"),
"localhost", 8080, "http://localhost",
5, 5, 10, TimeUnit.SECONDS, false, true, "public"));
}
@Override
public PageResult<InlongGroupBriefInfo> listGroups(InlongGroupPageRequest pageRequest) {
requests.add(cloneRequest(pageRequest));
InlongGroupBriefInfo info = InlongGroupBriefInfo.builder()
.inlongGroupId("g1")
.status(100)
.build();
PageResult<InlongGroupBriefInfo> result = new PageResult<>(Collections.singletonList(info));
result.setPageSize(1);
return result;
}
List<InlongGroupPageRequest> getRequests() {
return requests;
}
private InlongGroupPageRequest cloneRequest(InlongGroupPageRequest src) {
InlongGroupPageRequest copy = new InlongGroupPageRequest();
copy.setKeyword(src.getKeyword());
copy.setMqType(src.getMqType());
copy.setInlongGroupMode(src.getInlongGroupMode());
copy.setGroupIdList(src.getGroupIdList() == null ? null : new ArrayList<>(src.getGroupIdList()));
copy.setStatusList(src.getStatusList() == null ? null : new ArrayList<>(src.getStatusList()));
copy.setClusterTagList(src.getClusterTagList() == null ? null : new ArrayList<>(src.getClusterTagList()));
return copy;
}
}
}InLong Component
InLong Manager
Are you willing to submit PR?
- Yes, I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct