Skip to content

Commit

Permalink
Enhance message tracing to trace protobuf messages
Browse files Browse the repository at this point in the history
  • Loading branch information
daengi committed Sep 29, 2023
1 parent a58e55a commit fe51c34
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions src/main/java/at/esque/kafka/TraceUtils.java
@@ -1,12 +1,14 @@
package at.esque.kafka;

import at.esque.kafka.topics.model.KafkaHeaderFilterOption;
import com.google.protobuf.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Spliterator;
import java.util.Spliterators;
Expand All @@ -27,29 +29,59 @@ public static Predicate<ConsumerRecord> valuePredicate(String regex, boolean sea
if (searchNull) {
return ((cr) -> cr.value() == null);
} else {
Pattern pattern = Pattern.compile(regex);
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
return (cr) -> {
if (cr.value() == null) {
return false;
}
Matcher matcher = pattern.matcher(cr.value().toString());
return matcher.find();
if (cr.value() instanceof Message messageValue) {
Matcher matcher = null;
try {
matcher = pattern.matcher(JsonUtils.toJson(messageValue));
} catch (IOException e) {
throw new RuntimeException(e);
}
return matcher.find();
} else {
Matcher matcher = pattern.matcher(cr.value().toString());
return matcher.find();
}
};

}
}

public static Predicate<ConsumerRecord> keyPredicate(String search, @NotNull String keyMode) {
if (keyMode.equals("exact match")) {
return ((cr) -> StringUtils.equals(cr.key().toString(), search));
return ((cr) -> {
if (cr.key() instanceof Message messageKey) {
try {
return StringUtils.equals(JsonUtils.toJson(messageKey), search);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return StringUtils.equals(cr.key().toString(), search);
}
});
} else/*(keyMode.equals("regex (contains)"))*/ {
Pattern pattern = Pattern.compile(search);
Pattern pattern = Pattern.compile(search, Pattern.DOTALL);
return (cr) -> {
if (cr.key() == null) {
return false;
}
Matcher matcher = pattern.matcher(cr.key().toString());
return matcher.find();
if (cr.key() instanceof Message messageKey) {
Matcher matcher = null;
try {
matcher = pattern.matcher(JsonUtils.toJson(messageKey));
} catch (IOException e) {
throw new RuntimeException(e);
}
return matcher.find();
} else {
Matcher matcher = pattern.matcher(cr.key().toString());
return matcher.find();
}
};
}
}
Expand Down

0 comments on commit fe51c34

Please sign in to comment.