Skip to content

Commit

Permalink
#40 add header filters to trace dialog
Browse files Browse the repository at this point in the history
  • Loading branch information
patschuh committed Jul 7, 2023
1 parent a878119 commit 3052e1e
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 155 deletions.
12 changes: 12 additions & 0 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -499,6 +499,18 @@ private ListCell<String> topicListCellFactory() {
actualPredicate = keyPredicate.or(valuePredicate);
}

if (traceInput.getKafkaHeaderFilterOptions() != null && !traceInput.getKafkaHeaderFilterOptions().isEmpty()) {
List<Predicate<ConsumerRecord>> predicates = traceInput.getKafkaHeaderFilterOptions()
.stream()
.map(TraceUtils::consumerRecordHeaderPredicate)
.toList();

for (Predicate<ConsumerRecord> predicate : predicates) {
actualPredicate = actualPredicate.and(predicate);
}

}

trace(topicMessageTypeConfig, consumerConfig, actualPredicate, partition, traceInput.getEpochStart(), traceInput.getEpochEnd() == null ? null : consumerRecord -> consumerRecord.timestamp() >= traceInput.getEpochEnd());
});
} catch (Exception e) {
Expand Down
38 changes: 36 additions & 2 deletions src/main/java/at/esque/kafka/TraceUtils.java
@@ -1,15 +1,26 @@
package at.esque.kafka;

import at.esque.kafka.topics.model.KafkaHeaderFilterOption;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import javax.validation.constraints.NotNull;
import java.nio.charset.StandardCharsets;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class TraceUtils {

private TraceUtils() {
}


public static Predicate<ConsumerRecord> valuePredicate(String regex, boolean searchNull) {

Expand All @@ -29,9 +40,9 @@ public static Predicate<ConsumerRecord> valuePredicate(String regex, boolean sea
}

public static Predicate<ConsumerRecord> keyPredicate(String search, @NotNull String keyMode) {
if(keyMode.equals("exact match")) {
if (keyMode.equals("exact match")) {
return ((cr) -> StringUtils.equals(cr.key().toString(), search));
}else/*(keyMode.equals("regex (contains)"))*/{
} else/*(keyMode.equals("regex (contains)"))*/ {
Pattern pattern = Pattern.compile(search);
return (cr) -> {
if (cr.key() == null) {
Expand All @@ -43,4 +54,27 @@ public static Predicate<ConsumerRecord> keyPredicate(String search, @NotNull Str
}
}

public static Predicate<ConsumerRecord> consumerRecordHeaderPredicate(KafkaHeaderFilterOption kafkaHeaderFilterOption) {
return (consumerRecord) -> {
Headers headers = consumerRecord.headers();
Stream<Header> stream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(headers.headers(kafkaHeaderFilterOption.getHeader()).iterator(), Spliterator.ORDERED),
false);
return stream.anyMatch(header -> kafkaHeaderFilterOption.isExactMatch() ? headerValueExactMatch(kafkaHeaderFilterOption, header) : headerValueRegexMatch(kafkaHeaderFilterOption, header));
};
}

private static boolean headerValueExactMatch(KafkaHeaderFilterOption kafkaHeaderFilterOption, Header header) {
return new String(header.value(), StandardCharsets.UTF_8).equals(kafkaHeaderFilterOption.getFilterString());
}

private static boolean headerValueRegexMatch(KafkaHeaderFilterOption kafkaHeaderFilterOption, Header header) {
Pattern pattern = Pattern.compile(kafkaHeaderFilterOption.getFilterString());
if (header.value() == null) {
return false;
}

Matcher matcher = pattern.matcher(new String(header.value(), StandardCharsets.UTF_8));
return matcher.find();
}
}
17 changes: 15 additions & 2 deletions src/main/java/at/esque/kafka/dialogs/TraceInputDialog.java
Expand Up @@ -2,6 +2,7 @@

import at.esque.kafka.Main;
import at.esque.kafka.topics.TraceDialogController;
import at.esque.kafka.topics.model.KafkaHeaderFilterOption;
import javafx.collections.ObservableList;
import javafx.fxml.FXMLLoader;
import javafx.scene.Node;
Expand All @@ -12,6 +13,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -56,7 +58,7 @@ public static Optional<TraceInput> show(boolean isAvroKeyType, boolean traceQuic
updateRecentTrace(controller.keyTextBox.getText(), recentTraceMaxEntries);
updateRecentTrace(controller.valueTextBox.getText(), recentTraceMaxEntries);
}
return new TraceInput(controller.keyTextBox.getText(), controller.valueTextBox.getText(), keyMode, conditionMode, controller.fastTraceToggle.isSelected(), controller.tombstoneToggle.isSelected(), controller.epochStartInstantPicker.getInstantValue() == null ? null : controller.epochStartInstantPicker.getInstantValue().toEpochMilli(), controller.epochEndInstantPicker.getInstantValue() == null ? null : controller.epochEndInstantPicker.getInstantValue().toEpochMilli(), controller.specificParitionComboBox.getValue());
return new TraceInput(controller.keyTextBox.getText(), controller.valueTextBox.getText(), keyMode, conditionMode, controller.fastTraceToggle.isSelected(), controller.tombstoneToggle.isSelected(), controller.epochStartInstantPicker.getInstantValue() == null ? null : controller.epochStartInstantPicker.getInstantValue().toEpochMilli(), controller.epochEndInstantPicker.getInstantValue() == null ? null : controller.epochEndInstantPicker.getInstantValue().toEpochMilli(), controller.specificParitionComboBox.getValue(), controller.headerTableView.getItems());
}
return null;
});
Expand Down Expand Up @@ -85,7 +87,9 @@ public static class TraceInput {
private Long epochEnd;
private Integer partition;

public TraceInput(String keySearch, String valueSearch, String keyMode, String conditionMode, boolean fastTrace, boolean searchNull, Long epochStart, Long epochEnd, Integer partition) {
private List<KafkaHeaderFilterOption> kafkaHeaderFilterOptions = new ArrayList<>();

public TraceInput(String keySearch, String valueSearch, String keyMode, String conditionMode, boolean fastTrace, boolean searchNull, Long epochStart, Long epochEnd, Integer partition, List<KafkaHeaderFilterOption> kafkaHeaderFilterOptions) {
this.keySearch = keySearch;
this.valueSearch = valueSearch;
this.keyMode = keyMode;
Expand All @@ -95,6 +99,7 @@ public TraceInput(String keySearch, String valueSearch, String keyMode, String c
this.epochStart = epochStart;
this.epochEnd = epochEnd;
this.partition = partition;
this.kafkaHeaderFilterOptions = kafkaHeaderFilterOptions;
}

public String getKeySearch() {
Expand Down Expand Up @@ -168,5 +173,13 @@ public Integer getPartition() {
public void setPartition(Integer partition) {
this.partition = partition;
}

public List<KafkaHeaderFilterOption> getKafkaHeaderFilterOptions() {
return kafkaHeaderFilterOptions;
}

public void setKafkaHeaderFilterOptions(List<KafkaHeaderFilterOption> kafkaHeaderFilterOptions) {
this.kafkaHeaderFilterOptions = kafkaHeaderFilterOptions;
}
}
}
55 changes: 53 additions & 2 deletions src/main/java/at/esque/kafka/topics/TraceDialogController.java
Expand Up @@ -2,22 +2,32 @@

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.controls.InstantPicker;
import at.esque.kafka.topics.model.KafkaHeaderFilterOption;
import javafx.beans.binding.Bindings;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.value.ObservableValue;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.Button;
import javafx.scene.control.CheckBox;
import javafx.scene.control.ComboBox;
import javafx.scene.control.ListCell;
import javafx.scene.control.ListView;
import javafx.scene.control.RadioButton;
import javafx.scene.control.TableColumn;
import javafx.scene.control.TableView;
import javafx.scene.control.TextField;
import javafx.scene.control.TitledPane;
import javafx.scene.control.ToggleButton;
import javafx.scene.control.ToggleGroup;
import javafx.scene.control.cell.CheckBoxTableCell;
import javafx.scene.control.cell.TextFieldTableCell;
import javafx.scene.layout.Background;
import javafx.scene.layout.HBox;
import javafx.util.Callback;
import org.controlsfx.control.PopOver;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -56,6 +66,10 @@ public class TraceDialogController {
public TitledPane keyOptionsPane;
public TitledPane valueOptionsPane;
public ComboBox<Integer> specificParitionComboBox;
public TableView<KafkaHeaderFilterOption> headerTableView;
public TableColumn<KafkaHeaderFilterOption, String> headerFilterHeaderColumn;
public TableColumn<KafkaHeaderFilterOption, String> headerFilterFilterStringColumn;
public TableColumn<KafkaHeaderFilterOption, Boolean> headerFilterExactMatchColumn;

@FXML
public void initialize() {
Expand All @@ -70,10 +84,10 @@ public void setup(boolean isAvroKeyType, boolean traceQuickSelectEnabled, List<D
fillButtonBar(durations, quickSelectStartEpochButtonBar, epochStartInstantPicker);
fillButtonBar(durations, quickSelectEndEpochButtonBar, epochEndInstantPicker);
}
if(!isAvroKeyType){
if (!isAvroKeyType) {
fastTraceToggle.setDisable(false);
fastTraceToggle.disableProperty().bind(Bindings.or(keyModeRegexRadio.selectedProperty(), traceModeOrRadio.selectedProperty()));
}else{
} else {
fastTraceToggle.setDisable(true);
}
specificParitionComboBox.setItems(partitions);
Expand All @@ -85,6 +99,30 @@ public void setup(boolean isAvroKeyType, boolean traceQuickSelectEnabled, List<D

keyHistoryButton.setOnAction(event -> popOverKey.show(keyHistoryButton));
valueHistoryButton.setOnAction(event -> popOverValue.show(valueHistoryButton));

headerFilterHeaderColumn.setCellFactory(TextFieldTableCell.forTableColumn());
headerFilterFilterStringColumn.setCellFactory(TextFieldTableCell.forTableColumn());
headerFilterExactMatchColumn.setCellFactory(CheckBoxTableCell.forTableColumn(new Callback<Integer, ObservableValue<Boolean>>() {
@Override
public ObservableValue<Boolean> call(Integer param) {
KafkaHeaderFilterOption kafkaHeaderFilterOption = headerTableView.getItems().get(param);
SimpleBooleanProperty simpleBooleanProperty = new SimpleBooleanProperty(kafkaHeaderFilterOption.isExactMatch());
simpleBooleanProperty.addListener((observable, oldValue, newValue) -> kafkaHeaderFilterOption.setExactMatch(newValue))
;
return simpleBooleanProperty;
}
}, false));
headerFilterHeaderColumn.setCellValueFactory(param -> new SimpleStringProperty(param.getValue().getHeader()));
headerFilterFilterStringColumn.setCellValueFactory(param -> new SimpleStringProperty(param.getValue().getFilterString()));
headerFilterExactMatchColumn.setCellValueFactory(param -> new SimpleBooleanProperty(param.getValue().isExactMatch()));
headerFilterHeaderColumn.setOnEditCommit(event -> {
KafkaHeaderFilterOption current = event.getTableView().getItems().get(event.getTablePosition().getRow());
current.setHeader(event.getNewValue());
});
headerFilterFilterStringColumn.setOnEditCommit(event -> {
KafkaHeaderFilterOption current = event.getTableView().getItems().get(event.getTablePosition().getRow());
current.setFilterString(event.getNewValue());
});
}

private void clearButtonBar() {
Expand Down Expand Up @@ -173,4 +211,17 @@ private static ListView<String> buildRecentTracesView(TextField key) {
recentTraces.setItems(FXCollections.observableArrayList(sortable));
return recentTraces;
}

@FXML
private void addHeaderClick(ActionEvent event) {
headerTableView.getItems().add(new KafkaHeaderFilterOption("header", "regex or exact string to match", false));
}

@FXML
private void removeHeaderClick(ActionEvent e) {
int selectedIndex = headerTableView.getSelectionModel().getSelectedIndex();
if (selectedIndex > -1) {
headerTableView.getItems().remove(selectedIndex);
}
}
}
@@ -0,0 +1,63 @@
package at.esque.kafka.topics.model;

import java.util.Objects;

public class KafkaHeaderFilterOption {
private String header;
private String filterString;
private boolean exactMatch;

public KafkaHeaderFilterOption(String header, String filterString, boolean exactMatch) {
this.header = header;
this.filterString = filterString;
this.exactMatch = exactMatch;
}

public String getHeader() {
return header;
}

public void setHeader(String header) {
this.header = header;
}

public String getFilterString() {
return filterString;
}

public void setFilterString(String filterString) {
this.filterString = filterString;
}

public boolean isExactMatch() {
return exactMatch;
}

public void setExactMatch(boolean exactMatch) {
this.exactMatch = exactMatch;
}

@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (KafkaHeaderFilterOption) obj;
return Objects.equals(this.header, that.header) &&
Objects.equals(this.filterString, that.filterString) &&
this.exactMatch == that.exactMatch;
}

@Override
public int hashCode() {
return Objects.hash(header, filterString, exactMatch);
}

@Override
public String toString() {
return "KafkaHeaderFilterOption[" +
"header=" + header + ", " +
"filterString=" + filterString + ", " +
"exactMatch=" + exactMatch + ']';
}

}

0 comments on commit 3052e1e

Please sign in to comment.