Skip to content

Commit

Permalink
[Feature] didi#1155 Consume just filter key or value, not both. 消费消息支…
Browse files Browse the repository at this point in the history
…持单独过滤key或者value
  • Loading branch information
weidong_chang committed Sep 21, 2023
1 parent b1aa12b commit 9eeddae
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ const ConsumeClientTest = () => {
...configInfo,
needFilterKeyValue: changeValue === 1 || changeValue === 2,
needFilterSize: changeValue === 3 || changeValue === 4 || changeValue === 5,
needFilterKey: changeValue === 6,
needFilterValue: changeValue === 7,
});
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ export const filterList = [
label: 'Under Size',
value: 5,
},
{
label: 'key_contains',
value: 6,
},
{
label: 'value_contains',
value: 7,
}
];

export const untilList = [
Expand Down Expand Up @@ -324,10 +332,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
key: 'filterKey',
label: 'Key',
type: FormItemType.input,
invisible: !info?.needFilterKeyValue,
invisible: !info?.needFilterKeyValue && !info?.needFilterKey,
rules: [
{
required: info?.needFilterKeyValue,
required: info?.needFilterKeyValue || info?.needFilterKey,
message: '请输入Key',
},
],
Expand All @@ -336,10 +344,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
key: 'filterValue',
label: 'Value',
type: FormItemType.input,
invisible: !info?.needFilterKeyValue,
invisible: !info?.needFilterKeyValue && !info?.needFilterValue,
rules: [
{
required: info?.needFilterKeyValue,
required: info?.needFilterKeyValue || info?.needFilterValue,
message: '请输入Value',
},
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@ private Result<Void> checkStartFromAndFilterLegal(KafkaConsumerStartFromDTO star
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "包含的方式过滤,必须有过滤的key或value");
}

// key包含过滤
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareKey())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "key包含的方式过滤,必须有过滤的key");
}

// value包含过滤
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareValue())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "value包含的方式过滤,必须有过滤的value");
}

// 不包含过滤
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareKey()) && ValidateUtils.isBlank(filter.getFilterCompareValue())) {
Expand Down Expand Up @@ -550,6 +562,18 @@ private boolean checkMatchFilter(ConsumerRecord consumerRecord, KafkaConsumerFil
return true;
}

// key包含过滤
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && consumerRecord.key() != null && consumerRecord.key().toString().contains(filter.getFilterCompareKey()))) {
return true;
}

// value包含过滤
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareValue()) && consumerRecord.value() != null && consumerRecord.value().toString().contains(filter.getFilterCompareValue()))) {
return true;
}

// 不包含过滤
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && (consumerRecord.key() == null || !consumerRecord.key().toString().contains(filter.getFilterCompareKey())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class KafkaConsumerFilterDTO extends BaseDTO {
/**
* @see KafkaConsumerFilterEnum
*/
@Range(min = 0, max = 5, message = "filterType最大和最小值必须在[0, 5]之间")
@Range(min = 0, max = 7, message = "filterType最大和最小值必须在[0, 7]之间")
@ApiModelProperty(value = "开始消费位置的类型", example = "2")
private Integer filterType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public enum KafkaConsumerFilterEnum {

UNDER_SIZE(5, "size小于"),

KEY_CONTAINS(6, "key包含"),

VALUE_CONTAINS(7, "value包含"),

;

private final Integer code;
Expand Down

0 comments on commit 9eeddae

Please sign in to comment.