From fe51c345c39dfc4b1a0feacc0e1fc8666e4dc959 Mon Sep 17 00:00:00 2001 From: daengi <34056087+daengi@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:54:27 +0200 Subject: [PATCH] Enhance message tracing to trace protobuf messages --- src/main/java/at/esque/kafka/TraceUtils.java | 46 +++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/src/main/java/at/esque/kafka/TraceUtils.java b/src/main/java/at/esque/kafka/TraceUtils.java index 0abcdee..fde22d9 100644 --- a/src/main/java/at/esque/kafka/TraceUtils.java +++ b/src/main/java/at/esque/kafka/TraceUtils.java @@ -1,12 +1,14 @@ package at.esque.kafka; import at.esque.kafka.topics.model.KafkaHeaderFilterOption; +import com.google.protobuf.Message; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import javax.validation.constraints.NotNull; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Spliterator; import java.util.Spliterators; @@ -27,13 +29,23 @@ public static Predicate valuePredicate(String regex, boolean sea if (searchNull) { return ((cr) -> cr.value() == null); } else { - Pattern pattern = Pattern.compile(regex); + Pattern pattern = Pattern.compile(regex, Pattern.DOTALL); return (cr) -> { if (cr.value() == null) { return false; } - Matcher matcher = pattern.matcher(cr.value().toString()); - return matcher.find(); + if (cr.value() instanceof Message messageValue) { + Matcher matcher = null; + try { + matcher = pattern.matcher(JsonUtils.toJson(messageValue)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return matcher.find(); + } else { + Matcher matcher = pattern.matcher(cr.value().toString()); + return matcher.find(); + } }; } @@ -41,15 +53,35 @@ public static Predicate valuePredicate(String regex, boolean sea public static Predicate keyPredicate(String search, @NotNull String keyMode) { if (keyMode.equals("exact match")) { - return ((cr) -> StringUtils.equals(cr.key().toString(), search)); + return ((cr) -> { + if (cr.key() instanceof Message messageKey) { + try { + return StringUtils.equals(JsonUtils.toJson(messageKey), search); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + return StringUtils.equals(cr.key().toString(), search); + } + }); } else/*(keyMode.equals("regex (contains)"))*/ { - Pattern pattern = Pattern.compile(search); + Pattern pattern = Pattern.compile(search, Pattern.DOTALL); return (cr) -> { if (cr.key() == null) { return false; } - Matcher matcher = pattern.matcher(cr.key().toString()); - return matcher.find(); + if (cr.key() instanceof Message messageKey) { + Matcher matcher = null; + try { + matcher = pattern.matcher(JsonUtils.toJson(messageKey)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return matcher.find(); + } else { + Matcher matcher = pattern.matcher(cr.key().toString()); + return matcher.find(); + } }; } }