/
TraceUtils.java
112 lines (99 loc) · 4.33 KB
/
TraceUtils.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package at.esque.kafka;
import at.esque.kafka.topics.model.KafkaHeaderFilterOption;
import com.google.protobuf.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class TraceUtils {
private TraceUtils() {
}
public static Predicate<ConsumerRecord> valuePredicate(String regex, boolean searchNull) {
if (searchNull) {
return ((cr) -> cr.value() == null);
} else {
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
return (cr) -> {
if (cr.value() == null) {
return false;
}
if (cr.value() instanceof Message messageValue) {
Matcher matcher = null;
try {
matcher = pattern.matcher(JsonUtils.toJson(messageValue));
} catch (IOException e) {
throw new RuntimeException(e);
}
return matcher.find();
} else {
Matcher matcher = pattern.matcher(cr.value().toString());
return matcher.find();
}
};
}
}
public static Predicate<ConsumerRecord> keyPredicate(String search, @NotNull String keyMode) {
if (keyMode.equals("exact match")) {
return ((cr) -> {
if (cr.key() instanceof Message messageKey) {
try {
return StringUtils.equals(JsonUtils.toJson(messageKey), search);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return StringUtils.equals(cr.key().toString(), search);
}
});
} else/*(keyMode.equals("regex (contains)"))*/ {
Pattern pattern = Pattern.compile(search, Pattern.DOTALL);
return (cr) -> {
if (cr.key() == null) {
return false;
}
if (cr.key() instanceof Message messageKey) {
Matcher matcher = null;
try {
matcher = pattern.matcher(JsonUtils.toJson(messageKey));
} catch (IOException e) {
throw new RuntimeException(e);
}
return matcher.find();
} else {
Matcher matcher = pattern.matcher(cr.key().toString());
return matcher.find();
}
};
}
}
public static Predicate<ConsumerRecord> consumerRecordHeaderPredicate(KafkaHeaderFilterOption kafkaHeaderFilterOption) {
return (consumerRecord) -> {
Headers headers = consumerRecord.headers();
Stream<Header> stream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(headers.headers(kafkaHeaderFilterOption.getHeader()).iterator(), Spliterator.ORDERED),
false);
return stream.anyMatch(header -> kafkaHeaderFilterOption.isExactMatch() ? headerValueExactMatch(kafkaHeaderFilterOption, header) : headerValueRegexMatch(kafkaHeaderFilterOption, header));
};
}
private static boolean headerValueExactMatch(KafkaHeaderFilterOption kafkaHeaderFilterOption, Header header) {
return new String(header.value(), StandardCharsets.UTF_8).equals(kafkaHeaderFilterOption.getFilterString());
}
private static boolean headerValueRegexMatch(KafkaHeaderFilterOption kafkaHeaderFilterOption, Header header) {
Pattern pattern = Pattern.compile(kafkaHeaderFilterOption.getFilterString());
if (header.value() == null) {
return false;
}
Matcher matcher = pattern.matcher(new String(header.value(), StandardCharsets.UTF_8));
return matcher.find();
}
}