diff --git a/.github/workflows/gradle.yaml b/.github/workflows/gradle.yaml index 6645deb..1c41eb7 100644 --- a/.github/workflows/gradle.yaml +++ b/.github/workflows/gradle.yaml @@ -7,86 +7,87 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ macos-11, ubuntu-20.04, windows-2019 ] + os: [ macos-12, ubuntu-22.04, windows-2022 ] java: [ '17' ] fail-fast: false name: ${{ matrix.os }} steps: - name: Git checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Set up Java - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'zulu' java-version: ${{ matrix.java }} java-package: jdk+fx + cache: 'gradle' - name: Echo JAVA_HOME run: echo $JAVA_HOME - name: Verify Gradle Wrapper uses: gradle/wrapper-validation-action@v1 - - name: Execute build (macOs & Ubuntu) - if: matrix.os != 'windows-2019' + - name: Execute build (macOS & Ubuntu) + if: matrix.os != 'windows-2022' run: ./gradlew --info build - name: Execute build (Windows) - if: matrix.os == 'windows-2019' + if: matrix.os == 'windows-2022' run: .\gradlew.bat --info build - name: Upload TAR as an artifact - if: matrix.os != 'windows-2019' - uses: actions/upload-artifact@v2 + if: matrix.os != 'windows-2022' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-tar path: build/distributions/*.tar - name: Upload ZIP as an artifact - if: matrix.os == 'windows-2019' - uses: actions/upload-artifact@v2 + if: matrix.os == 'windows-2022' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-zip path: build/distributions/*.zip - - name: Execute runtime (macOs & Ubuntu) - if: matrix.os != 'windows-2019' + - name: Execute runtime (macOS & Ubuntu) + if: matrix.os != 'windows-2022' run: ./gradlew --info runtime - name: Execute runtime (Windows) - if: matrix.os == 'windows-2019' + if: matrix.os == 'windows-2022' run: .\gradlew.bat --info runtime - - name: Execute jpackage (macOs & Ubuntu) - if: matrix.os != 'windows-2019' + - name: Execute jpackage (macOS & Ubuntu) + if: matrix.os != 'windows-2022' run: ./gradlew --info jpackage - name: Execute jpackage (Windows) - if: matrix.os == 'windows-2019' + if: matrix.os == 'windows-2022' run: .\gradlew.bat --info jpackage - name: Upload DMG as an artifact - if: matrix.os == 'macos-11' - uses: actions/upload-artifact@v2 + if: matrix.os == 'macos-12' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-dmg path: build/jpackage/*.dmg - name: Upload PKG as an artifact - if: matrix.os == 'macos-11' - uses: actions/upload-artifact@v2 + if: matrix.os == 'macos-12' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-pkg path: build/jpackage/*.pkg - name: Upload DEB as an artifact - if: matrix.os == 'ubuntu-20.04' - uses: actions/upload-artifact@v2 + if: matrix.os == 'ubuntu-22.04' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-deb path: build/jpackage/*.deb - name: Upload RPM as an artifact - if: matrix.os == 'ubuntu-20.04' - uses: actions/upload-artifact@v2 + if: matrix.os == 'ubuntu-22.04' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-rpm path: build/jpackage/*.rpm - name: Upload EXE as an artifact - if: matrix.os == 'windows-2019' - uses: actions/upload-artifact@v2 + if: matrix.os == 'windows-2022' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-exe path: build/jpackage/*.exe - name: Upload MSI as an artifact - if: matrix.os == 'windows-2019' - uses: actions/upload-artifact@v2 + if: matrix.os == 'windows-2022' + uses: actions/upload-artifact@v3 with: name: kafkaesque-jdk${{ matrix.java }}-${{ matrix.os }}-msi path: build/jpackage/*.msi @@ -96,19 +97,19 @@ jobs: codesign --sign - --deep --force --preserve-metadata=entitlements,requirements,flags,runtime build/jpackage/*.dmg if: | startsWith(github.ref, 'refs/tags/') && - matrix.os == 'macos-11' + matrix.os == 'macos-12' - name: Create release (macOS) uses: softprops/action-gh-release@v1 if: | startsWith(github.ref, 'refs/tags/') && - matrix.os == 'macos-11' + matrix.os == 'macos-12' with: files: build/jpackage/*.dmg - name: Create release (Ubuntu) uses: softprops/action-gh-release@v1 if: | startsWith(github.ref, 'refs/tags/') && - matrix.os == 'ubuntu-20.04' + matrix.os == 'ubuntu-22.04' with: files: | build/jpackage/*.deb @@ -117,6 +118,29 @@ jobs: uses: softprops/action-gh-release@v1 if: | startsWith(github.ref, 'refs/tags/') && - matrix.os == 'windows-2019' + matrix.os == 'windows-2022' with: files: build/jpackage/*.exe + - name: Extract version (macOS) + id: extract-version + if: | + startsWith(github.ref, 'refs/tags/') && + matrix.os == 'macos-12' + run: | + TAG_NAME=${GITHUB_REF#refs/tags/} + echo "tag-name=${TAG_NAME:1}" >> $GITHUB_OUTPUT + - name: Bump Homebrew cask version (macOS) + uses: MWin123/bump-homebrew-formula-action@v0.3.0 + if: | + startsWith(github.ref, 'refs/tags/') && + matrix.os == 'macos-12' + with: + formula-name: homebrew-esque + formula-path: Casks/kafkaesque.rb + homebrew-tap: patschuh/homebrew-esque + base-branch: main + create-pullrequest: true + tag-name: ${{ steps.extract-version.outputs.tag-name }} + download-url: https://github.com/patschuh/KafkaEsque/releases/download/v${{ steps.extract-version.outputs.tag-name }}/KafkaEsque-${{ steps.extract-version.outputs.tag-name }}.dmg + env: + COMMITTER_TOKEN: ${{ secrets.COMMITTER_TOKEN }} diff --git a/build.gradle b/build.gradle index 5ac48e9..9eaa8ff 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ import org.gradle.internal.jvm.Jvm import org.gradle.internal.os.OperatingSystem + import java.text.SimpleDateFormat plugins { @@ -10,7 +11,7 @@ plugins { } group = 'at.esque.kafka' -version = '2.3.1' +version = '2.7.3' repositories { mavenCentral() @@ -39,8 +40,9 @@ dependencies { implementation 'org.kordamp.ikonli:ikonli-javafx:12.2.0' implementation 'org.kordamp.ikonli:ikonli-fontawesome-pack:12.2.0' implementation 'com.opencsv:opencsv:5.5.2' - implementation 'io.confluent:kafka-schema-registry:6.2.7' + implementation 'io.confluent:kafka-schema-registry:7.3.0' implementation 'io.confluent:kafka-avro-serializer:6.2.7' + implementation 'io.confluent:kafka-protobuf-serializer:6.2.7' implementation 'org.apache.avro:avro:1.11.1' implementation 'com.google.inject:guice:5.0.1' implementation 'org.fxmisc.richtext:richtextfx:0.10.7' @@ -87,6 +89,8 @@ runtime { def currentOs = OperatingSystem.current() def imgType = currentOs.windows ? 'ico' : currentOs.macOsX ? 'icns' : 'png' + + jvmArgs += ['-Dprism.dirtyopts=false'] imageOptions += ['--icon', "src/main/resources/icons/package/KafkaEsque.$imgType"] installerOptions += ['--resource-dir', "src/main/resources"] installerOptions += ['--vendor', 'Patrick Schuh'] diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 2e6e589..e411586 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/settings.gradle b/settings.gradle index e16af1f..f60c013 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'kafkaesque' +rootProject.name = 'KafkaEsque' diff --git a/src/main/java/at/esque/kafka/Controller.java b/src/main/java/at/esque/kafka/Controller.java index 20fe01f..51b5b87 100644 --- a/src/main/java/at/esque/kafka/Controller.java +++ b/src/main/java/at/esque/kafka/Controller.java @@ -41,6 +41,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.protobuf.Message; import com.opencsv.bean.CsvToBeanBuilder; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import javafx.application.HostServices; @@ -51,6 +52,7 @@ import javafx.collections.FXCollections; import javafx.collections.ObservableList; import javafx.event.ActionEvent; +import javafx.event.EventHandler; import javafx.fxml.FXML; import javafx.fxml.FXMLLoader; import javafx.scene.Parent; @@ -72,7 +74,12 @@ import javafx.scene.control.TableView; import javafx.scene.control.TextField; import javafx.scene.control.ToggleButton; +import javafx.scene.control.Tooltip; import javafx.scene.image.Image; +import javafx.scene.input.KeyCode; +import javafx.scene.input.KeyCodeCombination; +import javafx.scene.input.KeyCombination; +import javafx.scene.input.KeyEvent; import javafx.scene.input.MouseButton; import javafx.scene.layout.HBox; import javafx.stage.DirectoryChooser; @@ -108,7 +115,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.Serializable; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -126,6 +133,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -135,6 +143,7 @@ public class Controller { private static final Pattern REPLACER_PATTERN = Pattern.compile("\\$\\{(?.[^:{}]+):(?.[^:{}]+)}"); + public static final String ICONS_KAFKAESQUE_PNG_PATH = "/icons/kafkaesque.png"; private KafkaesqueAdminClient adminClient; @@ -162,6 +171,8 @@ public class Controller { //FXML @FXML + private Tooltip helpIconToolTip; + @FXML private KafkaEsqueCodeArea keyTextArea; @FXML private Tab valueTab; @@ -278,7 +289,7 @@ public void setup(Stage controlledStage) { clusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> { adminClient = new KafkaesqueAdminClient(newValue.getBootStrapServers(), configHandler.getSslProperties(selectedCluster()), configHandler.getSaslProperties(selectedCluster())); - refreshTopicList(newValue); + refreshTopicList(); }); partitionCombobox.getItems().add(-1); @@ -290,7 +301,7 @@ public void setup(Stage controlledStage) { if (newValue != null) { try { final Integer selectedItem = partitionCombobox.getSelectionModel().getSelectedItem(); - final List topicPatitions = adminClient.getTopicPatitions(newValue); + final List topicPatitions = adminClient.getPatitions(newValue); partitionCombobox.getItems().clear(); partitionCombobox.getItems().add(-1); partitionCombobox.getItems().addAll(topicPatitions); @@ -305,6 +316,9 @@ public void setup(Stage controlledStage) { } }); + headerTableView.setOnKeyPressed(generateHeaderTableEventHandler()); + metdataTableView.setOnKeyPressed(generateMetadataTableEventHandler()); + configHandler.configureKafkaEsqueCodeArea(keyTextArea); configHandler.configureKafkaEsqueCodeArea(valueTextArea); @@ -319,17 +333,14 @@ public void setup(Stage controlledStage) { ClusterConfig dummycluster = new ClusterConfig(); dummycluster.setIdentifier("Empty"); messageTabPane.getTabs().add(createTab(dummycluster, "Tab")); + helpIconToolTip.setText(buildToolTip()); versionInfoHandler.showDialogIfUpdateIsAvailable(hostServices); } private void setupJsonFormatToggle() { - formatJsonToggle.selectedProperty().addListener((observable, oldValue, newValue) -> { - updateKeyValueTextArea(selectedMessage, newValue); - }); - jqQueryField.textProperty().addListener((observable, oldValue, newValue) -> { - updateKeyValueTextArea(selectedMessage, formatJsonToggle.isSelected()); - }); + formatJsonToggle.selectedProperty().addListener((observable, oldValue, newValue) -> updateKeyValueTextArea(selectedMessage, newValue)); + jqQueryField.textProperty().addListener((observable, oldValue, newValue) -> updateKeyValueTextArea(selectedMessage, formatJsonToggle.isSelected())); jqQueryField.visibleProperty().bind(Bindings.createBooleanBinding(() -> { if (formatJsonToggle.isSelected()) { jqQueryField.setMaxWidth(-1); @@ -459,7 +470,7 @@ private ListCell topicListCellFactory() { try { TopicMessageTypeConfig topicMessageTypeConfig = configHandler.getConfigForTopic(selectedCluster().getIdentifier(), selectedTopic()); Map consumerConfig = configHandler.readConsumerConfigs(selectedCluster().getIdentifier()); - TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems()) + TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO || topicMessageTypeConfig.getKeyType() == MessageType.PROTOBUF_SR, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems()) .ifPresent(traceInput -> { backGroundTaskHolder.setBackGroundTaskDescription("tracing message"); Integer partition = null; @@ -490,42 +501,25 @@ private ListCell topicListCellFactory() { actualPredicate = keyPredicate.or(valuePredicate); } - trace(topicMessageTypeConfig, consumerConfig, actualPredicate, partition, traceInput.getEpoch()); + if (traceInput.getKafkaHeaderFilterOptions() != null && !traceInput.getKafkaHeaderFilterOptions().isEmpty()) { + List> predicates = traceInput.getKafkaHeaderFilterOptions() + .stream() + .map(TraceUtils::consumerRecordHeaderPredicate) + .toList(); + + for (Predicate predicate : predicates) { + actualPredicate = actualPredicate.and(predicate); + } + + } + + trace(topicMessageTypeConfig, consumerConfig, actualPredicate, partition, traceInput.getEpochStart(), traceInput.getEpochEnd() == null ? null : consumerRecord -> consumerRecord.timestamp() >= traceInput.getEpochEnd()); }); } catch (Exception e) { ErrorAlert.show(e, controlledStage); } }); -// MenuItem traceInValueItem = new MenuItem(); -// traceInValueItem.setGraphic(new FontIcon(FontAwesome.SEARCH)); -// traceInValueItem.textProperty().set("trace in value"); -// traceInValueItem.setOnAction(event -> { -// try { -// TopicMessageTypeConfig topicMessageTypeConfig = configHandler.getConfigForTopic(selectedCluster().getIdentifier(), selectedTopic()); -// Map consumerConfig = configHandler.readConsumerConfigs(selectedCluster().getIdentifier()); -// TraceInputDialog.show(false, false, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES))) -// .ifPresent(traceInput -> { -// backGroundTaskHolder.setBackGroundTaskDescription("tracing in Value: " + traceInput.getSearch()); -// Pattern pattern = Pattern.compile(traceInput.getSearch()); -// trace(topicMessageTypeConfig, consumerConfig, (ConsumerRecord cr) -> { -// if (traceInput.isSearchNull()) { -// return cr.value() == null; -// } else { -// -// if (cr.value() == null) { -// return false; -// } -// Matcher matcher = pattern.matcher(cr.value().toString()); -// return matcher.find(); -// } -// }, null, traceInput.getEpoch()); -// }); -// } catch (Exception e) { -// ErrorAlert.show(e, controlledStage); -// } -// }); - MenuItem deleteItem = new MenuItem(); deleteItem.setGraphic(new FontIcon(FontAwesome.TRASH)); deleteItem.textProperty().set("delete"); @@ -560,12 +554,12 @@ private ListCell topicListCellFactory() { return cell; } - private void refreshTopicList(ClusterConfig newValue) { + private void refreshTopicList() { backGroundTaskHolder.setBackGroundTaskDescription("getting Topics..."); - runInDaemonThread(() -> getTopicsForCluster(newValue)); + runInDaemonThread(() -> getTopicsForCluster()); } - private void getTopicsForCluster(ClusterConfig clusterConfig) { + private void getTopicsForCluster() { StopWatch stopWatch = new StopWatch(); try { stopWatch.start(); @@ -573,7 +567,8 @@ private void getTopicsForCluster(ClusterConfig clusterConfig) { backGroundTaskHolder.setIsInProgress(true); Set topics = adminClient.getTopics(); Platform.runLater(() -> topicListView.setItems(topics)); - + } catch (Exception e) { + Platform.runLater(() -> ErrorAlert.show(e)); } finally { stopWatch.stop(); LOGGER.info("Finished getting topics for cluster [{}]", stopWatch); @@ -583,10 +578,7 @@ private void getTopicsForCluster(ClusterConfig clusterConfig) { @FXML public void refreshButtonClick(ActionEvent e) { - ClusterConfig selectedCluster = selectedCluster(); - if (selectedCluster != null) { - refreshTopicList(selectedCluster); - } + refreshTopicList(); } @FXML @@ -643,7 +635,7 @@ public void schemaRegistryClick(ActionEvent event) { SchemaRegistryBrowserController controller = fxmlLoader.getController(); controller.setup(selectedConfig, configHandler); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.NONE); stage.setTitle("Browse Schema Registry - " + selectedConfig.getIdentifier()); @@ -675,7 +667,7 @@ public void kafkaConnectClick(ActionEvent actionEvent) { KafkaConnectBrowserController controller = fxmlLoader.getController(); controller.setup(selectedConfig, configHandler); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.NONE); stage.setTitle("Browse Kafka Connect - " + selectedConfig.getIdentifier()); @@ -707,7 +699,7 @@ public void kafkaConnectInstalledPluginClick(ActionEvent actionEvent) { InstalledConnectorPluginsController controller = fxmlLoader.getController(); controller.setup(selectedConfig, configHandler); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.NONE); stage.setTitle("Browse installed Kafka Connect plugins - " + selectedConfig.getIdentifier()); @@ -728,7 +720,7 @@ public void crossClusterClick(ActionEvent actionEvent) { CrossClusterController controller = fxmlLoader.getController(); controller.setup(); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.APPLICATION_MODAL); stage.setTitle("Cross Cluster Operations"); @@ -751,7 +743,7 @@ public void lagViewerClick(ActionEvent actionEvent) { KafkaConsumer consumer = consumerHandler.getConsumer(consumerId).orElseThrow(() -> new RuntimeException("Error getting consumer")); controller.setup(adminClient, consumer); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.NONE); stage.setTitle("Lag Viewer - " + selectedCluster().getIdentifier()); @@ -776,16 +768,14 @@ public void aclViewer(ActionEvent actionEvent) { AclViewerController controller = fxmlLoader.getController(); controller.setup(adminClient); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.NONE); stage.setTitle("ACL Viewer - " + selectedCluster().getIdentifier()); stage.setScene(Main.createStyledScene(root1, 1000, 500)); stage.show(); centerStageOnControlledStage(stage); - stage.setOnCloseRequest(windowEvent -> { - controller.stop(); - }); + stage.setOnCloseRequest(windowEvent -> controller.stop()); } catch (Exception e) { ErrorAlert.show(e, controlledStage); } @@ -809,7 +799,6 @@ private void getOldestMessages(TopicMessageTypeConfig topic, Map Map maxOffsets = consumerHandler.getMaxOffsets(consumerId); consumerHandler.seekToOffset(consumerId, -1); Map currentOffsets = consumerHandler.getCurrentOffsets(consumerId); - //baseList.clear(); PinTab tab = getActiveTabOrAddNew(topic, false); ObservableList baseList = getAndClearBaseList(tab); Platform.runLater(() -> backGroundTaskHolder.setBackGroundTaskDescription("getting messages...")); @@ -922,7 +911,10 @@ private void subscribeOrAssignToSelectedPartition(TopicMessageTypeConfig topic, if (selectedPartition >= 0) { consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(Collections.singletonList(new TopicPartition(selectedTopic(), selectedPartition)))); } else { - consumerHandler.subscribe(consumerId, topic.getName()); + List partitions = adminClient.getPatitions(topic.getName()).stream() + .map(integer -> new TopicPartition(topic.getName(), integer)) + .collect(Collectors.toList()); + consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions)); } } @@ -960,7 +952,7 @@ private void getMessagesContinuously(TopicMessageTypeConfig topic, Map< }); } - private void trace(TopicMessageTypeConfig topic, Map consumerConfig, Predicate predicate, Integer fasttracePartition, Long epoch) { + private void trace(TopicMessageTypeConfig topic, Map consumerConfig, Predicate predicate, Integer fasttracePartition, Long epoch, Predicate stopTraceInPartitionCondition) { runInDaemonThread(() -> { UUID consumerId = null; try { @@ -971,11 +963,14 @@ private void trace(TopicMessageTypeConfig topic, Map co } try { backGroundTaskHolder.setIsInProgress(true); + List topicPatitions; if (fasttracePartition != null) { - consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(Collections.singletonList(new TopicPartition(selectedTopic(), fasttracePartition)))); + topicPatitions = new ArrayList<>(Collections.singletonList(new TopicPartition(selectedTopic(), fasttracePartition))); } else { - consumerHandler.subscribe(consumerId, selectedTopic()); + topicPatitions = new ArrayList<>(adminClient.getTopicPatitions(selectedTopic())); } + consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(topicPatitions)); + AtomicLong messagesConsumed = new AtomicLong(0); AtomicLong messagesFound = new AtomicLong(0); Map minOffsets = consumerHandler.getMinOffsets(consumerId); @@ -992,12 +987,28 @@ private void trace(TopicMessageTypeConfig topic, Map co while (!backGroundTaskHolder.getStopBackGroundTask() && !reachedMaxOffsetForAllPartitions(maxOffsets, minOffsets, currentOffsets)) { ConsumerRecords records = topicConsumer.poll(Duration.ofSeconds(1)); records.forEach(cr -> { + if (!topicPatitions.contains(new TopicPartition(cr.topic(), cr.partition()))) { + return; + } messagesConsumed.incrementAndGet(); currentOffsets.put(new TopicPartition(cr.topic(), cr.partition()), cr.offset()); if (predicate.test(cr)) { convertAndAdd(cr, baseList); messagesFound.incrementAndGet(); } + if (stopTraceInPartitionCondition != null && stopTraceInPartitionCondition.test(cr)) { + Optional first = topicPatitions.stream() + .filter(topicPartition -> topicPartition.partition() == cr.partition()) + .findFirst(); + + first.ifPresent(topicPartition -> { + topicPatitions.remove(topicPartition); + maxOffsets.remove(topicPartition); + minOffsets.remove(topicPartition); + currentOffsets.remove(topicPartition); + topicConsumer.assign(topicPatitions); + }); + } }); Platform.runLater(() -> backGroundTaskHolder.setProgressMessage(String.format("Found %s in %s consumed Messages", messagesFound, messagesConsumed))); } @@ -1015,11 +1026,9 @@ private int getPartitionForKey(String topic, String key) { throw new RuntimeException("Failed to determine number of partitions!", topicDescription.getException()); } int numberOfPartitions = topicDescription.getTopicDescription().partitions().size(); - try { - return Utils.toPositive(Utils.murmur2(key.getBytes("UTF-8"))) % numberOfPartitions; - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } + + return Utils.toPositive(Utils.murmur2(key.getBytes(StandardCharsets.UTF_8))) % numberOfPartitions; + } private void convertAndAdd(ConsumerRecord cr, ObservableList baseList) { @@ -1029,12 +1038,26 @@ private void convertAndAdd(ConsumerRecord cr, ObservableList partitions = adminClient.getTopicPatitions(selectedTopic()); + List partitions = adminClient.getPatitions(selectedTopic()); FXMLLoader fxmlLoader = injector.getInstance(FXMLLoader.class); fxmlLoader.setLocation(getClass().getResource("/fxml/publishMessage.fxml")); Parent root1 = fxmlLoader.load(); PublisherController controller = fxmlLoader.getController(); controller.setup(selectedCluster(), selectedTopic(), FXCollections.observableArrayList(partitions), kafkaMessage); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.APPLICATION_MODAL); stage.setTitle("Publish Message"); @@ -1226,7 +1249,7 @@ private void showCreateTopicDialog() { CreateTopicController controller = fxmlLoader.getController(); controller.setup(adminClient); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.APPLICATION_MODAL); stage.setTitle("Create Topic"); @@ -1247,7 +1270,7 @@ private void showDescribeTopicDialog(String topic) { if (!describeTopicWrapper.isFailed()) { controller.setup(describeTopicWrapper); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.APPLICATION_MODAL); stage.setTitle("Topic Description"); @@ -1288,7 +1311,7 @@ private void showJsonDiffDialog(KafkaMessage source, KafkaMessage target) { MessageDiffView controller = fxmlLoader.getController(); controller.setup(source, target); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.APPLICATION_MODAL); stage.setTitle("Json Diff"); @@ -1409,7 +1432,8 @@ public void playMessageBook(ActionEvent event) { }); producerHandler.sendMessage(producerId, message.getTargetTopic(), message.getPartition() == -1 ? null : message.getPartition(), message.getKey(), message.getValue(), message.getKeyType(), message.getValueType()); Platform.runLater(() -> backGroundTaskHolder.setProgressMessage("published " + counter.incrementAndGet() + " messages")); - } catch (InterruptedException | ExecutionException | TimeoutException | IOException | RestClientException e) { + } catch (InterruptedException | ExecutionException | TimeoutException | IOException | + RestClientException e) { throw new RuntimeException(e); } }); @@ -1438,7 +1462,7 @@ private void addMessagesToSend(List messagesToSend, File .withType(KafkaMessage.class) .build().parse(); messagesToSend.addAll(messages.stream().map(message -> new KafkaMessagBookWrapper(playFile.getName(), message)) - .collect(Collectors.toList())); + .toList()); } catch (FileNotFoundException e) { Platform.runLater(() -> ErrorAlert.show(e, controlledStage)); } @@ -1479,6 +1503,10 @@ private void centerStageOnControlledStage(Stage stage) { } private PinTab createTab(ClusterConfig clusterConfig, String name) { + return createTab(clusterConfig, name, null); + } + + private PinTab createTab(ClusterConfig clusterConfig, String name, List kafkaMessages) { MessagesTabContent messagesTabContent = new MessagesTabContent(); @@ -1527,6 +1555,12 @@ private PinTab createTab(ClusterConfig clusterConfig, String name) { updateKeyValueTextArea(selectedItem, formatJsonToggle.isSelected()); }); + if (kafkaMessages != null) { + messagesTabContent.getMessageTableView().getBaseList().clear(); + messagesTabContent.getMessageTableView().getBaseList().addAll(kafkaMessages); + + } + return new PinTab(clusterConfig.getIdentifier() + " - " + name, messagesTabContent); } @@ -1542,7 +1576,7 @@ public void aboutClick(ActionEvent event) { AboutController controller = fxmlLoader.getController(); controller.setup(versionInfoHandler, hostServices); Stage stage = new Stage(); - stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png"))); + stage.getIcons().add(new Image(getClass().getResourceAsStream(ICONS_KAFKAESQUE_PNG_PATH))); stage.initOwner(controlledStage); stage.initModality(Modality.APPLICATION_MODAL); stage.setTitle("About KafkaEsque"); @@ -1555,7 +1589,48 @@ public void aboutClick(ActionEvent event) { } } + private EventHandler generateHeaderTableEventHandler() { + Map> copyCombinations = Map.of( + new KeyCodeCombination(KeyCode.C, KeyCombination.SHORTCUT_DOWN), header -> new String(header.value(), StandardCharsets.UTF_8), + new KeyCodeCombination(KeyCode.K, KeyCombination.SHORTCUT_DOWN), Header::key + ); + + return SystemUtils.generateTableCopySelectedItemCopyEventHandler(headerTableView, copyCombinations); + } + + private EventHandler generateMetadataTableEventHandler() { + Map> copyCombinations = Map.of( + new KeyCodeCombination(KeyCode.C, KeyCombination.SHORTCUT_DOWN), metadata -> metadata.valueAsString().getValue(), + new KeyCodeCombination(KeyCode.K, KeyCombination.SHORTCUT_DOWN), metadata -> metadata.nameProperty().getName() + ); + + return SystemUtils.generateTableCopySelectedItemCopyEventHandler(metdataTableView, copyCombinations); + } + + public void setHostServices(HostServices hostServices) { this.hostServices = hostServices; } + + private String buildToolTip() { + return String.format("The following KeyCombinations let you copy data from the selected element in the metadata and header table%n" + + new KeyCodeCombination(KeyCode.K, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy Key/Name%n" + + new KeyCodeCombination(KeyCode.C, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy Value%n"); + } + + @FXML + public void loadFileToTabClick(ActionEvent actionEvent) { + FileChooser fileChooser = new FileChooser(); + fileChooser.setTitle("Load Message File"); + fileChooser.getExtensionFilters().add(new FileChooser.ExtensionFilter("JSON File (*.json)", "*.json")); + File selectedFile = fileChooser.showOpenDialog(controlledStage); + try (FileReader fileReader = new FileReader(selectedFile)) { + List kafkaMessages = JsonUtils.readMessages(fileReader); + ClusterConfig dummyFileClusterConfig = new ClusterConfig(); + dummyFileClusterConfig.setIdentifier("Loaded File"); + messageTabPane.getTabs().add(createTab(dummyFileClusterConfig, selectedFile.getName(), kafkaMessages)); + } catch (Exception e) { + ErrorAlert.show(e, controlledStage); + } + } } diff --git a/src/main/java/at/esque/kafka/CreateSchemaController.java b/src/main/java/at/esque/kafka/CreateSchemaController.java index a4f4b1c..a30a777 100644 --- a/src/main/java/at/esque/kafka/CreateSchemaController.java +++ b/src/main/java/at/esque/kafka/CreateSchemaController.java @@ -4,13 +4,14 @@ import at.esque.kafka.alerts.SuccessAlert; import at.esque.kafka.controls.KafkaEsqueCodeArea; import io.confluent.kafka.schemaregistry.client.rest.RestService; +import javafx.collections.FXCollections; import javafx.event.ActionEvent; import javafx.fxml.FXML; +import javafx.scene.control.ComboBox; import javafx.scene.control.TextField; import javafx.stage.FileChooser; import javafx.stage.Stage; import javafx.stage.Window; -import org.apache.avro.Schema; import java.io.File; import java.io.IOException; @@ -24,6 +25,8 @@ public class CreateSchemaController { private TextField subjectTextField; @FXML private KafkaEsqueCodeArea schemaTextArea; + @FXML + private ComboBox schemaTypeComboBox; private RestService restService; @@ -31,10 +34,13 @@ public class CreateSchemaController { public void addSchema(ActionEvent actionEvent) { - Schema.Parser parser = new Schema.Parser(); try { - parser.parse(schemaTextArea.getText()); - restService.registerSchema(schemaTextArea.getText(), subjectTextField.getText()); + restService.registerSchema( + schemaTextArea.getText(), + schemaTypeComboBox.getSelectionModel().getSelectedItem(), + null, + subjectTextField.getText()); + SuccessAlert.show("Success", null, "Schema added successfully!", getWindow()); } catch (Exception e) { ErrorAlert.show(e, getWindow()); @@ -61,6 +67,8 @@ private Window getWindow() { public void setup(String selectedSubject, RestService restService, Stage stage) { this.restService = restService; + this.schemaTypeComboBox.setItems(FXCollections.observableArrayList("AVRO", "PROTOBUF")); + this.schemaTypeComboBox.getSelectionModel().select(0); this.stage = stage; if (selectedSubject != null) { subjectTextField.setText(selectedSubject); diff --git a/src/main/java/at/esque/kafka/CrossClusterController.java b/src/main/java/at/esque/kafka/CrossClusterController.java index bc81cd8..6b96084 100644 --- a/src/main/java/at/esque/kafka/CrossClusterController.java +++ b/src/main/java/at/esque/kafka/CrossClusterController.java @@ -13,6 +13,8 @@ import at.esque.kafka.handlers.ConsumerHandler; import at.esque.kafka.handlers.CrossClusterOperationHandler; import at.esque.kafka.handlers.ProducerHandler; +import at.esque.kafka.topics.KafkaMessage; +import at.esque.kafka.topics.metadata.NumericMetadata; import com.google.inject.Inject; import javafx.application.Platform; import javafx.beans.binding.Bindings; @@ -20,6 +22,7 @@ import javafx.collections.ObservableList; import javafx.event.ActionEvent; import javafx.fxml.FXML; +import javafx.scene.control.CheckBox; import javafx.scene.control.ComboBox; import javafx.scene.control.ListCell; import javafx.scene.control.ListView; @@ -27,23 +30,30 @@ import javafx.scene.control.ToggleButton; import javafx.scene.control.Tooltip; import javafx.stage.Window; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.kordamp.ikonli.fontawesome.FontAwesome; import org.kordamp.ikonli.javafx.FontIcon; import java.io.IOException; import java.text.MessageFormat; import java.time.Duration; +import java.time.Instant; +import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class CrossClusterController { @@ -57,6 +67,8 @@ public class CrossClusterController { private TextField valueRegexFilterField; @FXML private TextField amountLimit; + @FXML + public CheckBox reserializeMessagesToggle; @FXML private FilterableListView fromClusterTopicsList; @@ -94,11 +106,11 @@ public void setup() { toClusterComboBox.setItems(clusterConfigs.getClusterConfigs()); fromClusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> { - setupClusterControls(newValue, fromAdmin, fromClusterTopicsList); + fromAdmin = setupClusterControls(newValue, fromAdmin, fromClusterTopicsList); }); toClusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> { - setupClusterControls(newValue, toAdmin, toClusterTopicsList); + toAdmin = setupClusterControls(newValue, toAdmin, toClusterTopicsList); }); runningOperationsList.setCellFactory(ropl -> { @@ -123,7 +135,7 @@ public void setup() { case "Stopped": return FontIcon.of(FontAwesome.STOP_CIRCLE); default: - if(cell.getItem().finishedExceptionaly()) { + if (cell.getItem().finishedExceptionaly()) { cell.setOnMouseClicked(mouseEvent -> ErrorAlert.show(cell.getItem().getException(), getWindow())); } return FontIcon.of(FontAwesome.WARNING); @@ -140,16 +152,17 @@ public void setup() { refreshOperationList(null); } - private void setupClusterControls(ClusterConfig clusterConfig, KafkaesqueAdminClient adminClient, FilterableListView topicList) { + private KafkaesqueAdminClient setupClusterControls(ClusterConfig clusterConfig, KafkaesqueAdminClient adminClient, FilterableListView topicList) { if (adminClient != null) { adminClient.close(); } - adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers(), configHandler.getSslProperties(clusterConfig),configHandler.getSaslProperties(clusterConfig)); + adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers(), configHandler.getSslProperties(clusterConfig), configHandler.getSaslProperties(clusterConfig)); KafkaesqueAdminClient finalAdminClient = adminClient; runInDaemonThread(() -> { ObservableList topics = FXCollections.observableArrayList(finalAdminClient.getTopics()); Platform.runLater(() -> topicList.setItems(topics)); }); + return finalAdminClient; } private void startOperation(UUID operationId) { @@ -160,7 +173,10 @@ private void startOperation(UUID operationId) { try { producerId = producerHandler.registerProducer(operation.getToCluster(), operation.getToTopic().getName()); consumerId = consumerHandler.registerConsumer(operation.getFromCluster(), operation.getFromTopic(), configHandler.readConsumerConfigs(operation.getToCluster().getIdentifier())); - consumerHandler.subscribe(consumerId, operation.getFromTopic().getName()); + List partitions = fromAdmin.getPatitions(operation.getFromTopic().getName()).stream() + .map(integer -> new TopicPartition(operation.getFromTopic().getName(), integer)) + .collect(Collectors.toList()); + consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions)); if (instantPicker.getInstantValue() != null) { consumerHandler.seekToTime(consumerId, instantPicker.getInstantValue().toEpochMilli()); } else { @@ -178,12 +194,19 @@ private void startOperation(UUID operationId) { while (!operation.getStop().get() && (limit == null || count.get() < limit) && !operation.getStatus().equals("Error")) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); Iterable records = consumerRecords.records(operation.getFromTopic().getName()); - records.forEach(consumerRecord -> { + Iterator iterator = records.iterator(); + while ((limit == null || count.get() < limit) && iterator.hasNext()) { + ConsumerRecord consumerRecord = iterator.next(); try { if (operation.getFilterFunction().test(consumerRecord)) { - ProducerRecord producerRecord = new ProducerRecord(operation.getToTopic().getName(), consumerRecord.key(), consumerRecord.value()); - consumerRecord.headers().forEach(header -> producerRecord.headers().add(header)); - producerHandler.sendRecord(producerId, producerRecord); + if (reserializeMessagesToggle.isSelected()) { + KafkaMessage convert = convert(consumerRecord); + producerHandler.sendMessage(producerId, operation.getToTopic().getName(), -1, convert.getKey(), convert.getValue(), convert.getKeyType(), convert.getValueType(), convert.getHeaders()); + } else { + ProducerRecord producerRecord = new ProducerRecord(operation.getToTopic().getName(), consumerRecord.key(), consumerRecord.value()); + consumerRecord.headers().forEach(header -> producerRecord.headers().add(header)); + producerHandler.sendRecord(producerId, producerRecord); + } count.incrementAndGet(); } } catch (Exception e) { @@ -192,7 +215,7 @@ private void startOperation(UUID operationId) { operation.setStatus("Error"); }); } - }); + } } if (operation.getStop().get()) { Platform.runLater(() -> operation.setStatus("Stopped")); @@ -267,4 +290,34 @@ public void stopSelectedOperation(ActionEvent actionEvent) { crossClusterOperationHandler.markOperationForStop(selectedItem.getOperationId()); } } + + private KafkaMessage convert(ConsumerRecord cr) { + KafkaMessage kafkaMessage = new KafkaMessage(); + kafkaMessage.setOffset(cr.offset()); + kafkaMessage.setPartition(cr.partition()); + kafkaMessage.setKey(cr.key() == null ? null : cr.key().toString()); + kafkaMessage.setValue(cr.value() == null ? null : cr.value().toString()); + + if (cr.value() instanceof GenericData.Record) { + kafkaMessage.setValueType(extractTypeFromGenericRecord((GenericData.Record) cr.value())); + } + if (cr.key() instanceof GenericData.Record) { + kafkaMessage.setKeyType(extractTypeFromGenericRecord((GenericData.Record) cr.key())); + } + + kafkaMessage.setTimestamp(Instant.ofEpochMilli(cr.timestamp()).toString()); + kafkaMessage.setHeaders(FXCollections.observableArrayList(cr.headers().toArray())); + + kafkaMessage.getMetaData().add(new NumericMetadata("Serialized Key Size", (long) cr.serializedKeySize())); + kafkaMessage.getMetaData().add(new NumericMetadata("Serialized Value Size", (long) cr.serializedValueSize())); + return kafkaMessage; + } + + private String extractTypeFromGenericRecord(GenericData.Record genericRecord) { + if (genericRecord == null || genericRecord.getSchema() == null) { + return null; + } + Schema schema = genericRecord.getSchema(); + return schema.getNamespace() + "." + schema.getName(); + } } diff --git a/src/main/java/at/esque/kafka/JsonUtils.java b/src/main/java/at/esque/kafka/JsonUtils.java index 312d6fa..12b804a 100644 --- a/src/main/java/at/esque/kafka/JsonUtils.java +++ b/src/main/java/at/esque/kafka/JsonUtils.java @@ -1,16 +1,33 @@ package at.esque.kafka; import at.esque.kafka.controls.JsonTreeItem; +import at.esque.kafka.serialization.jackson.KafkaHeaderDeserializer; +import at.esque.kafka.serialization.jackson.KafkaHeaderSerializer; +import at.esque.kafka.serialization.jackson.MessageMetaDataDeserializer; +import at.esque.kafka.topics.KafkaMessage; +import at.esque.kafka.topics.metadata.MessageMetaData; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.Struct; +import com.google.protobuf.util.JsonFormat; import javafx.scene.control.TreeItem; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.Reader; +import java.io.Writer; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -23,7 +40,88 @@ public final class JsonUtils { private JsonUtils() { } - private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final ObjectMapper objectMapper = initializeObjectMapper(); + + private static ObjectMapper initializeObjectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Header.class, new KafkaHeaderDeserializer()); + simpleModule.addDeserializer(MessageMetaData.class, new MessageMetaDataDeserializer()); + simpleModule.addSerializer(RecordHeader.class, new KafkaHeaderSerializer()); + objectMapper.registerModule(simpleModule); + return objectMapper; + } + + public static void writeMessageToJsonFile(List messages, Writer writer) { + try { + List value1 = messages.stream() + .map(kafkaMessage -> { + JsonNode jsonNode = objectMapper.valueToTree(kafkaMessage); + String value = kafkaMessage.getValue(); + String key = kafkaMessage.getKey(); + try { + JsonNode jsonNode1 = objectMapper.readTree(value); + if (jsonNode1.isObject()) { + ((ObjectNode) jsonNode).set("value", jsonNode1); + } + } catch (Exception e) { + logger.warn("Failed to convert value to jsonNode [{}]", value); + } + try { + JsonNode jsonNode1 = objectMapper.readTree(key); + if (jsonNode1.isObject()) { + ((ObjectNode) jsonNode).set("key", jsonNode1); + } + } catch (Exception e) { + logger.warn("Failed to convert key to jsonNode [{}]", key); + } + return jsonNode; + } + ).toList(); + objectMapper.writerWithDefaultPrettyPrinter().writeValue(writer, value1); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static List readMessages(Reader reader) { + ArrayList kafkaMessages = new ArrayList<>(); + try { + JsonNode inputMessages = objectMapper.readTree(reader); + if (inputMessages.isArray()) { + inputMessages.iterator().forEachRemaining(singleMessage -> { + JsonNode value = singleMessage.get("value"); + JsonNode key = singleMessage.get("key"); + + if (value.isObject()) { + try { + ((ObjectNode) singleMessage).set("value", TextNode.valueOf(objectMapper.writeValueAsString(value))); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + if (key.isObject()) { + try { + ((ObjectNode) singleMessage).set("key", TextNode.valueOf(objectMapper.writeValueAsString(key))); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + try { + KafkaMessage kafkaMessage = objectMapper.readValue(singleMessage.toString(), KafkaMessage.class); + kafkaMessages.add(kafkaMessage); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return kafkaMessages; + } public static String formatJson(String string) { try { @@ -166,10 +264,10 @@ private static void removeNode(JsonTreeItem path, JsonNode jsonNode) { private static void replaceValue(JsonTreeItem path, JsonNode jsonNode) { if (jsonNode.get("value").isNull()) { - path.setPropertyValue((path.getPropertyValue() == null ? "" : path.getPropertyValue()) + " -> " + ((jsonNode.get("value") instanceof TextNode)?jsonNode.get("value").textValue():jsonNode.get("value"))); + path.setPropertyValue((path.getPropertyValue() == null ? "" : path.getPropertyValue()) + " -> " + ((jsonNode.get("value") instanceof TextNode) ? jsonNode.get("value").textValue() : jsonNode.get("value"))); applyChangeTypeAndPropagateToChilds(path, "remove"); } else { - path.setPropertyValue(path.getPropertyValue() + " -> " + ((jsonNode.get("value") instanceof TextNode)?jsonNode.get("value").textValue():jsonNode.get("value"))); + path.setPropertyValue(path.getPropertyValue() + " -> " + ((jsonNode.get("value") instanceof TextNode) ? jsonNode.get("value").textValue() : jsonNode.get("value"))); path.setPropertyChangedType(jsonNode.get("op").textValue()); } } @@ -257,4 +355,21 @@ private static void applyCorrectAdder(JsonNode value, JsonTreeItem treeItem) { recursivelyAddElements((String.valueOf(value)), treeItem); } } + + public static Message fromJson(String json) { + Struct.Builder structBuilder = Struct.newBuilder(); + try { + JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return structBuilder.build(); + } + + public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException { + if (messageOrBuilder == null) { + return null; + } + return JsonFormat.printer().print(messageOrBuilder); + } } diff --git a/src/main/java/at/esque/kafka/KafkaConnectBrowserController.java b/src/main/java/at/esque/kafka/KafkaConnectBrowserController.java index cc88ed8..7589e44 100644 --- a/src/main/java/at/esque/kafka/KafkaConnectBrowserController.java +++ b/src/main/java/at/esque/kafka/KafkaConnectBrowserController.java @@ -25,8 +25,13 @@ import javafx.scene.control.TableColumn; import javafx.scene.control.TableRow; import javafx.scene.control.TableView; +import javafx.scene.control.Tooltip; import javafx.scene.control.cell.PropertyValueFactory; import javafx.scene.image.Image; +import javafx.scene.input.KeyCode; +import javafx.scene.input.KeyCodeCombination; +import javafx.scene.input.KeyCombination; +import javafx.scene.input.KeyEvent; import javafx.stage.Modality; import javafx.stage.Stage; import javafx.stage.Window; @@ -35,14 +40,18 @@ import org.kordamp.ikonli.javafx.FontIcon; import java.util.Map; +import java.util.function.Function; public class KafkaConnectBrowserController { - private static final String RIGHT_PANE_CONNECTOR_ACTION_PAUSE = "pause connector"; - private static final String RIGHT_PANE_CONNECTOR_ACTION_RESUME = "resume connector"; + private static final String RIGHT_PANE_CONNECTOR_ACTION_PAUSE = "pause connector"; + private static final String RIGHT_PANE_CONNECTOR_ACTION_RESUME = "resume connector"; private static final String RIGHT_PANE_CONNECTOR_ACTION_RESTART = "restart connector"; + @FXML + private Tooltip helpIconToolTip; + @FXML private FilterableListView connectorListView; @FXML @@ -114,8 +123,7 @@ public void handle(ActionEvent event) { } updateRightPane(selectedConnectorInRightPane); - } catch (Exception e) - { + } catch (Exception e) { ErrorAlert.show(e, getWindow()); } @@ -127,20 +135,19 @@ public void handle(ActionEvent event) { } }); - - + taskTableView.setOnKeyPressed(generateMessageTableCopyEventHandler()); + helpIconToolTip.setText(buildToolTip()); } private Window getWindow() { return connectorConfigTextArea.getScene().getWindow(); } - public void updateRightPane(String selectedConnector) - { + public void updateRightPane(String selectedConnector) { try { - if(selectedConnector != null) { + if (selectedConnector != null) { Map connectorConfig = kafkaesqueConnectClient.getConnectorConfig(selectedConnector); - connectorConfigTextArea.setText(ConnectUtil.buildConfigString(connectorConfig,ConnectUtil.PARAM_BLACK_LIST_VIEW)); + connectorConfigTextArea.setText(ConnectUtil.buildConfigString(connectorConfig, ConnectUtil.PARAM_BLACK_LIST_VIEW)); Status status = kafkaesqueConnectClient.getConnectorStatus(selectedConnector); connectorStatus.setText(status.getStatus()); @@ -149,9 +156,7 @@ public void updateRightPane(String selectedConnector) selectedConnectorInRightPane = selectedConnector; taskTableView.setItems(FXCollections.observableArrayList(status.getTaskStatusList())); - } - else - { + } else { connectorConfigTextArea.setText(""); connectorStatus.setText(""); updateDisableFlagOfConnectorActionButtonStatus(null); @@ -180,8 +185,7 @@ public void refreshConnectors(ActionEvent actionEvent) { } } - private void showConnectConfigDialog(String selectedConnector) - { + private void showConnectConfigDialog(String selectedConnector) { try { FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource("/fxml/createConnector.fxml")); Parent root1 = fxmlLoader.load(); @@ -208,21 +212,20 @@ private ListCell connectorListCellFactory() { deleteItem.setGraphic(new FontIcon(FontAwesome.TRASH)); deleteItem.textProperty().set("delete"); deleteItem.setOnAction(event -> { - if (ConfirmationAlert.show("Delete Connector", "Connector [" + cell.itemProperty().get() + "] will be deleted.", "Are you sure you want to delete this connector", getWindow())) { - try { - boolean result = kafkaesqueConnectClient.deleteConnector(cell.itemProperty().get()); - - if(result == true) { - SuccessAlert.show("Delete Connector", null, "Connector [" + cell.itemProperty().get() + "] deleted.", getWindow()); - } else - { - WarningAlert.show( "Delete Connector", null, "It wasn't possible to delete the connector", getWindow()); - } - } catch (Exception e) { - ErrorAlert.show(e, getWindow()); + if (ConfirmationAlert.show("Delete Connector", "Connector [" + cell.itemProperty().get() + "] will be deleted.", "Are you sure you want to delete this connector", getWindow())) { + try { + boolean result = kafkaesqueConnectClient.deleteConnector(cell.itemProperty().get()); + + if (result == true) { + SuccessAlert.show("Delete Connector", null, "Connector [" + cell.itemProperty().get() + "] deleted.", getWindow()); + } else { + WarningAlert.show("Delete Connector", null, "It wasn't possible to delete the connector", getWindow()); } + } catch (Exception e) { + ErrorAlert.show(e, getWindow()); } - }); + } + }); MenuItem configItem = new MenuItem(); configItem.setGraphic(new FontIcon(FontAwesome.COG)); @@ -247,12 +250,11 @@ private ListCell connectorListCellFactory() { } - private void updateDisableFlagOfConnectorActionButtonStatus(String connectorStatus) - { + private void updateDisableFlagOfConnectorActionButtonStatus(String connectorStatus) { if (connectorStatus == null) connectorStatus = ""; - switch(connectorStatus){ + switch (connectorStatus) { case "RUNNING": pauseButton.setDisable(false); resumeButton.setDisable(true); @@ -268,25 +270,21 @@ private void updateDisableFlagOfConnectorActionButtonStatus(String connectorStat } @FXML - public void pauseConnectorClick(ActionEvent actionEvent) - { + public void pauseConnectorClick(ActionEvent actionEvent) { rightPaneConnectorAction(RIGHT_PANE_CONNECTOR_ACTION_PAUSE); } @FXML - public void resumeConnectorClick(ActionEvent actionEvent) - { + public void resumeConnectorClick(ActionEvent actionEvent) { rightPaneConnectorAction(RIGHT_PANE_CONNECTOR_ACTION_RESUME); } @FXML - public void restartConnectorClick(ActionEvent actionEvent) - { + public void restartConnectorClick(ActionEvent actionEvent) { rightPaneConnectorAction(RIGHT_PANE_CONNECTOR_ACTION_RESTART); } - private void rightPaneConnectorAction(String action) - { + private void rightPaneConnectorAction(String action) { try { if (selectedConnectorInRightPane == null) @@ -294,7 +292,7 @@ private void rightPaneConnectorAction(String action) boolean result = false; - switch(action){ + switch (action) { case RIGHT_PANE_CONNECTOR_ACTION_PAUSE: result = kafkaesqueConnectClient.pauseConnector(selectedConnectorInRightPane); break; @@ -306,9 +304,8 @@ private void rightPaneConnectorAction(String action) break; } - if (result != true) - { - WarningAlert.show("Connector Action", null, String.format("Connector action '%s' returned fales from API!",action), getWindow()); + if (result != true) { + WarningAlert.show("Connector Action", null, String.format("Connector action '%s' returned fales from API!", action), getWindow()); } //Refresh view @@ -317,4 +314,23 @@ private void rightPaneConnectorAction(String action) ErrorAlert.show(e, getWindow()); } } + + private EventHandler generateMessageTableCopyEventHandler() { + Map> copyCombinations = Map.of( + new KeyCodeCombination(KeyCode.I, KeyCombination.SHORTCUT_DOWN), taskStatus -> Integer.toString(taskStatus.getId()), + new KeyCodeCombination(KeyCode.S, KeyCombination.SHORTCUT_DOWN), Status.TaskStatus::getStatus, + new KeyCodeCombination(KeyCode.W, KeyCombination.SHORTCUT_DOWN), Status.TaskStatus::getWorkerId, + new KeyCodeCombination(KeyCode.T, KeyCombination.SHORTCUT_DOWN), Status.TaskStatus::getTrace + ); + + return SystemUtils.generateTableCopySelectedItemCopyEventHandler(taskTableView, copyCombinations); + } + + private String buildToolTip() { + return String.format("The following KeyCombinations let you copy data from the selected element in the tasks table%n" + + new KeyCodeCombination(KeyCode.I, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy task id%n" + + new KeyCodeCombination(KeyCode.S, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy task status%n" + + new KeyCodeCombination(KeyCode.W, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy worker id%n" + + new KeyCodeCombination(KeyCode.T, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy trace"); + } } diff --git a/src/main/java/at/esque/kafka/Launcher.java b/src/main/java/at/esque/kafka/Launcher.java index a4a518e..66d14c9 100644 --- a/src/main/java/at/esque/kafka/Launcher.java +++ b/src/main/java/at/esque/kafka/Launcher.java @@ -1,7 +1,5 @@ package at.esque.kafka; -//import at.esque.kafka.serialization.FakeUUIDConversion; - import at.esque.kafka.serialization.logicaltypes.KafkaEsqueConversions; import ch.qos.logback.classic.util.ContextInitializer; diff --git a/src/main/java/at/esque/kafka/MessageType.java b/src/main/java/at/esque/kafka/MessageType.java index 4437adb..97075dc 100644 --- a/src/main/java/at/esque/kafka/MessageType.java +++ b/src/main/java/at/esque/kafka/MessageType.java @@ -4,6 +4,9 @@ public enum MessageType { STRING, AVRO, AVRO_TOPIC_RECORD_NAME_STRATEGY, + PROTOBUF_SR, + BASE64, + UUID, SHORT, INTEGER, LONG, @@ -11,6 +14,5 @@ public enum MessageType { DOUBLE, BYTEARRAY, BYTEBUFFER, - BYTES, - UUID + BYTES } diff --git a/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java b/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java index cc09f1a..e6e0e44 100644 --- a/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java +++ b/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java @@ -3,6 +3,7 @@ import at.esque.kafka.alerts.ErrorAlert; import at.esque.kafka.controls.KafkaEsqueCodeArea; import io.confluent.kafka.schemaregistry.client.rest.RestService; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.Label; @@ -11,7 +12,6 @@ import javafx.stage.FileChooser; import javafx.stage.Stage; import javafx.stage.Window; -import org.apache.avro.Schema; import java.io.File; import java.io.IOException; @@ -40,11 +40,9 @@ public class SchemaCompatibilityCheckController { public void addSchema(ActionEvent actionEvent) { - Schema.Parser parser = new Schema.Parser(); try { - parser.parse(schemaTextArea.getText()); - - List compatibility = restService.testCompatibility(schemaTextArea.getText(), subjectTextField.getText(), versionTextField.getText()); + Schema schema = restService.getVersion(subjectTextField.getText(), Integer.parseInt(versionTextField.getText())); + List compatibility = restService.testCompatibility(schemaTextArea.getText(), schema.getSchemaType(), null, subjectTextField.getText(), versionTextField.getText(), false); if (compatibility.isEmpty()) { resultLabel.setTextFill(Color.web("#3d7a3d")); diff --git a/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java b/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java index d81d191..bcd1ab9 100644 --- a/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java +++ b/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java @@ -10,26 +10,38 @@ import at.esque.kafka.dialogs.SubjectConfigDialog; import at.esque.kafka.handlers.ConfigHandler; import io.confluent.kafka.schemaregistry.client.rest.RestService; +import io.confluent.kafka.schemaregistry.client.rest.entities.Config; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import javafx.collections.FXCollections; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.fxml.FXMLLoader; import javafx.scene.Parent; -import javafx.scene.control.*; +import javafx.scene.control.ComboBox; +import javafx.scene.control.ContextMenu; +import javafx.scene.control.Label; +import javafx.scene.control.ListCell; +import javafx.scene.control.MenuItem; import javafx.scene.image.Image; import javafx.stage.Modality; import javafx.stage.Stage; import javafx.stage.Window; import org.kordamp.ikonli.fontawesome.FontAwesome; import org.kordamp.ikonli.javafx.FontIcon; -import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; import java.util.Collections; +import java.util.Map; public class SchemaRegistryBrowserController { + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigHandler.class); + private RestService schemaRegistryRestService; @FXML private FilterableListView subjectListView; @@ -42,6 +54,10 @@ public class SchemaRegistryBrowserController { @FXML private Label schemaIdLabel; + @FXML + private Label compatibilityLabel; + @FXML + private Label typeLabel; public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) { @@ -64,10 +80,25 @@ public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) { try { Schema schema = schemaRegistryRestService.getVersion(subjectListView.getListView().getSelectionModel().getSelectedItem(), newValue1); schemaIdLabel.setText(String.valueOf(schema.getId())); - schemaTextArea.setText(JsonUtils.formatJson(schema.getSchema())); + schemaTextArea.setText("AVRO".equals(schema.getSchemaType()) ? JsonUtils.formatJson(schema.getSchema()) : schema.getSchema()); + typeLabel.setText(schema.getSchemaType()); } catch (Exception e) { ErrorAlert.show(e, getWindow()); } + try { + // get and set global config manually before requesting subject config, for compatibility with older schema-registry versions (pre 7.0) + Config globalConfig = schemaRegistryRestService.getConfig(null); + compatibilityLabel.setText(globalConfig.getCompatibilityLevel()); + Config config = schemaRegistryRestService.getConfig(Map.of(), subjectListView.getListView().getSelectionModel().getSelectedItem(), true); + compatibilityLabel.setText(config.getCompatibilityLevel()); + } catch (RestClientException e) { + if (e.getErrorCode() == 40401 || e.getErrorCode() == 40408) { + //for compatibility with older schema-registry versions, on current versions (7.0+) the defaultToGlobal flag should prevent 404 error codes and return the global config + LOGGER.warn("Error while trying to retrieve subject config, might be because of schema-registry version before 7.0", e); + } + } catch (IOException e) { + ErrorAlert.show(e, getWindow()); + } })); subjectListView.getListView().setCellFactory(param -> subjectListCellFactory()); diff --git a/src/main/java/at/esque/kafka/SystemUtils.java b/src/main/java/at/esque/kafka/SystemUtils.java index 0d2af42..7e6aabf 100644 --- a/src/main/java/at/esque/kafka/SystemUtils.java +++ b/src/main/java/at/esque/kafka/SystemUtils.java @@ -2,17 +2,23 @@ import at.esque.kafka.alerts.ErrorAlert; import javafx.application.Platform; +import javafx.event.EventHandler; +import javafx.scene.control.TableView; import javafx.scene.control.TextInputDialog; import javafx.scene.image.Image; import javafx.scene.input.Clipboard; import javafx.scene.input.ClipboardContent; +import javafx.scene.input.KeyCodeCombination; +import javafx.scene.input.KeyEvent; import javafx.stage.Modality; import javafx.stage.Stage; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; +import java.util.function.Function; public class SystemUtils { @@ -52,4 +58,16 @@ public static Optional showInputDialog(String defaultValue, String title return Optional.empty(); } } + + public static EventHandler generateTableCopySelectedItemCopyEventHandler(TableView targetTableView, Map> copyCombinationMap) { + return keyEvent -> { + if (targetTableView.equals(keyEvent.getSource()) && targetTableView.getSelectionModel().getSelectedItem() != null) { + copyCombinationMap.entrySet() + .stream() + .filter(keyCombinationFunctionEntry -> keyCombinationFunctionEntry.getKey().match(keyEvent)) + .findFirst() + .ifPresent(keyCombinationFunctionEntry -> SystemUtils.copyStringSelectionToClipboard(() -> keyCombinationFunctionEntry.getValue().apply(targetTableView.getSelectionModel().getSelectedItem()))); + } + }; + } } diff --git a/src/main/java/at/esque/kafka/TraceUtils.java b/src/main/java/at/esque/kafka/TraceUtils.java index b671a82..fde22d9 100644 --- a/src/main/java/at/esque/kafka/TraceUtils.java +++ b/src/main/java/at/esque/kafka/TraceUtils.java @@ -1,46 +1,112 @@ package at.esque.kafka; +import at.esque.kafka.topics.model.KafkaHeaderFilterOption; +import com.google.protobuf.Message; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import javax.validation.constraints.NotNull; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; public class TraceUtils { + private TraceUtils() { + } + public static Predicate valuePredicate(String regex, boolean searchNull) { if (searchNull) { return ((cr) -> cr.value() == null); } else { - Pattern pattern = Pattern.compile(regex); + Pattern pattern = Pattern.compile(regex, Pattern.DOTALL); return (cr) -> { if (cr.value() == null) { return false; } - Matcher matcher = pattern.matcher(cr.value().toString()); - return matcher.find(); + if (cr.value() instanceof Message messageValue) { + Matcher matcher = null; + try { + matcher = pattern.matcher(JsonUtils.toJson(messageValue)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return matcher.find(); + } else { + Matcher matcher = pattern.matcher(cr.value().toString()); + return matcher.find(); + } }; } } public static Predicate keyPredicate(String search, @NotNull String keyMode) { - if(keyMode.equals("exact match")) { - return ((cr) -> StringUtils.equals(cr.key().toString(), search)); - }else/*(keyMode.equals("regex (contains)"))*/{ - Pattern pattern = Pattern.compile(search); + if (keyMode.equals("exact match")) { + return ((cr) -> { + if (cr.key() instanceof Message messageKey) { + try { + return StringUtils.equals(JsonUtils.toJson(messageKey), search); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + return StringUtils.equals(cr.key().toString(), search); + } + }); + } else/*(keyMode.equals("regex (contains)"))*/ { + Pattern pattern = Pattern.compile(search, Pattern.DOTALL); return (cr) -> { if (cr.key() == null) { return false; } - Matcher matcher = pattern.matcher(cr.key().toString()); - return matcher.find(); + if (cr.key() instanceof Message messageKey) { + Matcher matcher = null; + try { + matcher = pattern.matcher(JsonUtils.toJson(messageKey)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return matcher.find(); + } else { + Matcher matcher = pattern.matcher(cr.key().toString()); + return matcher.find(); + } }; } } + public static Predicate consumerRecordHeaderPredicate(KafkaHeaderFilterOption kafkaHeaderFilterOption) { + return (consumerRecord) -> { + Headers headers = consumerRecord.headers(); + Stream
stream = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(headers.headers(kafkaHeaderFilterOption.getHeader()).iterator(), Spliterator.ORDERED), + false); + return stream.anyMatch(header -> kafkaHeaderFilterOption.isExactMatch() ? headerValueExactMatch(kafkaHeaderFilterOption, header) : headerValueRegexMatch(kafkaHeaderFilterOption, header)); + }; + } + + private static boolean headerValueExactMatch(KafkaHeaderFilterOption kafkaHeaderFilterOption, Header header) { + return new String(header.value(), StandardCharsets.UTF_8).equals(kafkaHeaderFilterOption.getFilterString()); + } + + private static boolean headerValueRegexMatch(KafkaHeaderFilterOption kafkaHeaderFilterOption, Header header) { + Pattern pattern = Pattern.compile(kafkaHeaderFilterOption.getFilterString()); + if (header.value() == null) { + return false; + } + + Matcher matcher = pattern.matcher(new String(header.value(), StandardCharsets.UTF_8)); + return matcher.find(); + } } diff --git a/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java b/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java index e62e866..0478d01 100644 --- a/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java +++ b/src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; @@ -31,7 +32,6 @@ import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -42,7 +42,6 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; public class KafkaesqueAdminClient { private AdminClient adminClient; @@ -70,20 +69,34 @@ public Set getTopics() { return new HashSet<>(); } - public List getTopicPatitions(String topic) { + public List getPatitions(String topic) { DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic)); try { - return result.values().get(topic).get().partitions().stream() - .map(TopicPartitionInfo::partition).collect(Collectors.toList()); + return result.topicNameValues().get(topic).get().partitions().stream() + .map(TopicPartitionInfo::partition) + .toList(); } catch (Exception e) { ErrorAlert.show(e); } - return null; + return Collections.emptyList(); } + public List getTopicPatitions(String topic) { + DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic)); + try { + return result.topicNameValues().get(topic).get().partitions().stream() + .map(topicPartitionInfo -> new TopicPartition(topic, topicPartitionInfo.partition())) + .toList(); + } catch (Exception e) { + ErrorAlert.show(e); + } + return Collections.emptyList(); + } + + public void deleteTopic(String name) throws ExecutionException, InterruptedException { DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(name)); - result.values().get(name).get(); + result.topicNameValues().get(name).get(); } public void createTopic(String name, int partitions, short replicationFactor, Map configs) throws ExecutionException, InterruptedException { @@ -102,7 +115,7 @@ public DescribeTopicWrapper describeTopic(String topic) { DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singletonList(configResource)); try { - TopicDescription topicDescription = describeResult.values().get(topic).get(10, TimeUnit.SECONDS); + TopicDescription topicDescription = describeResult.topicNameValues().get(topic).get(10, TimeUnit.SECONDS); Config config = configsResult.values().get(configResource).get(10, TimeUnit.SECONDS); return new DescribeTopicWrapper(topicDescription, config); @@ -119,11 +132,11 @@ public List getConsumerGroups() { Lag lag = new Lag(); lag.setTitle(consumerGroupListing.groupId()); return lag; - }).collect(Collectors.toList()); + }).toList(); } catch (Exception e) { Platform.runLater(() -> ErrorAlert.show(e)); } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } public List getACLs(ResourceType resourceType, PatternType resourcePattern, String resourceName, String principalName) { @@ -138,12 +151,12 @@ public List getACLs(ResourceType resourceType, PatternType resourceP DescribeAclsResult describeAclsResult = adminClient.describeAcls(aclBindingFilter); - return describeAclsResult.values().get().stream().collect(Collectors.toList()); + return describeAclsResult.values().get().stream().toList(); } catch (Exception e) { Platform.runLater(() -> ErrorAlert.show(e)); } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } public List getACLsBySubstring(ResourceType resourceType, PatternType resourcePattern, String resourceName, String principalName) { @@ -154,14 +167,14 @@ public List getACLsBySubstring(ResourceType resourceType, PatternTyp DescribeAclsResult describeAclsResult = adminClient.describeAcls(aclBindingFilter); return describeAclsResult.values().get().stream() - .filter(acl -> "".equals(resourceName) ? true : acl.pattern().name().contains(resourceName)) - .filter(acl -> "".equals(principalName) ? true : acl.entry().principal().contains(principalName)) - .collect(Collectors.toList()); + .filter(acl -> "".equals(resourceName) || acl.pattern().name().contains(resourceName)) + .filter(acl -> "".equals(principalName) || acl.entry().principal().contains(principalName)) + .toList(); } catch (Exception e) { Platform.runLater(() -> ErrorAlert.show(e)); } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } @@ -181,7 +194,7 @@ public void addAcl(AclBinding aclBinding) { try { - CreateAclsResult result = adminClient.createAcls(Arrays.asList(aclBinding)); + CreateAclsResult result = adminClient.createAcls(Collections.singletonList(aclBinding)); result.all().get(); diff --git a/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java b/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java index d6919ff..4d6db8d 100644 --- a/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java +++ b/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java @@ -58,7 +58,13 @@ public void setValueType(MessageType valueType) { this.valueType.set(valueType.name()); } - public boolean containsAvro() { - return getValueType() == MessageType.AVRO || getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || getKeyType() == MessageType.AVRO || getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY; + public boolean requiresSchemaRegistry() { + return getValueType() == MessageType.AVRO || + getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || + getValueType() == MessageType.PROTOBUF_SR || + getKeyType() == MessageType.AVRO || + getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || + getKeyType() == MessageType.PROTOBUF_SR + ; } } diff --git a/src/main/java/at/esque/kafka/controls/MessagesTabContent.java b/src/main/java/at/esque/kafka/controls/MessagesTabContent.java index c041891..9aed372 100644 --- a/src/main/java/at/esque/kafka/controls/MessagesTabContent.java +++ b/src/main/java/at/esque/kafka/controls/MessagesTabContent.java @@ -1,13 +1,21 @@ package at.esque.kafka.controls; +import at.esque.kafka.JsonUtils; +import at.esque.kafka.SystemUtils; import at.esque.kafka.alerts.ErrorAlert; import at.esque.kafka.topics.KafkaMessage; import com.opencsv.bean.StatefulBeanToCsv; import com.opencsv.bean.StatefulBeanToCsvBuilder; import javafx.event.ActionEvent; +import javafx.event.EventHandler; import javafx.fxml.FXML; import javafx.fxml.FXMLLoader; import javafx.scene.control.TextField; +import javafx.scene.control.Tooltip; +import javafx.scene.input.KeyCode; +import javafx.scene.input.KeyCodeCombination; +import javafx.scene.input.KeyCombination; +import javafx.scene.input.KeyEvent; import javafx.scene.layout.VBox; import javafx.stage.FileChooser; import org.apache.commons.lang3.StringUtils; @@ -16,6 +24,8 @@ import java.io.FileWriter; import java.io.IOException; import java.io.Writer; +import java.util.Map; +import java.util.function.Function; public class MessagesTabContent extends VBox { @@ -23,6 +33,8 @@ public class MessagesTabContent extends VBox { private TextField messageSearchTextField; @FXML private KafkaMessageTableView messageTableView; + @FXML + private Tooltip helpIconToolTip; public MessagesTabContent() { FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource( @@ -46,31 +58,65 @@ private void setup() { || (km.getValue() != null && StringUtils.containsIgnoreCase(km.getValue(), newValue))) )); }); + messageTableView.setOnKeyPressed(generateMessageTableCopyEventHandler()); + helpIconToolTip.setText(buildToolTip()); } @FXML - public void exportCsvClick(ActionEvent event) { + public void exportFileClick(ActionEvent event) { FileChooser fileChooser = new FileChooser(); - fileChooser.setTitle("Save messages as csv"); + fileChooser.setTitle("Save messages to file"); + fileChooser.getExtensionFilters().add(new FileChooser.ExtensionFilter("JSON File (*.json)", "*.json")); fileChooser.getExtensionFilters().add(new FileChooser.ExtensionFilter("CSV File (*.csv)", "*.csv")); File selectedFile = fileChooser.showSaveDialog(this.getScene().getWindow()); if (selectedFile != null) { + String fileName = selectedFile.getName(); + String fileExtension = fileName.substring(fileName.lastIndexOf(".") + 1); try (Writer writer = new FileWriter(selectedFile.getAbsolutePath())) { - StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer).build(); - messageTableView.getItems().forEach(message -> { + if ("json".equals(fileExtension)) { try { - beanToCsv.write(message); + JsonUtils.writeMessageToJsonFile(messageTableView.getItems(), writer); } catch (Exception e) { ErrorAlert.show(e); } - }); + } else { + StatefulBeanToCsv beanToCsv = new StatefulBeanToCsvBuilder(writer).build(); + messageTableView.getItems().forEach(message -> { + try { + beanToCsv.write(message); + } catch (Exception e) { + ErrorAlert.show(e); + } + }); + } } catch (Exception e) { ErrorAlert.show(e); } } } + private EventHandler generateMessageTableCopyEventHandler() { + Map> copyCombinations = Map.of( + new KeyCodeCombination(KeyCode.C, KeyCombination.SHORTCUT_DOWN), KafkaMessage::getValue, + new KeyCodeCombination(KeyCode.K, KeyCombination.SHORTCUT_DOWN), KafkaMessage::getKey, + new KeyCodeCombination(KeyCode.O, KeyCombination.SHORTCUT_DOWN), message -> Long.toString(message.getOffset()), + new KeyCodeCombination(KeyCode.P, KeyCombination.SHORTCUT_DOWN), message -> Integer.toString(message.getPartition()), + new KeyCodeCombination(KeyCode.T, KeyCombination.SHORTCUT_DOWN), KafkaMessage::getTimestamp + ); + + return SystemUtils.generateTableCopySelectedItemCopyEventHandler(messageTableView, copyCombinations); + } + public KafkaMessageTableView getMessageTableView() { return messageTableView; } + + private String buildToolTip() { + return String.format("The following KeyCombinations let you copy data from the selected element in the messages table%n" + + new KeyCodeCombination(KeyCode.C, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy value%n" + + new KeyCodeCombination(KeyCode.K, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy key%n" + + new KeyCodeCombination(KeyCode.O, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy offset%n" + + new KeyCodeCombination(KeyCode.P, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy partition%n" + + new KeyCodeCombination(KeyCode.T, KeyCombination.SHORTCUT_DOWN).getDisplayText() + " - copy timestamp"); + } } diff --git a/src/main/java/at/esque/kafka/dialogs/SubjectConfigDialog.java b/src/main/java/at/esque/kafka/dialogs/SubjectConfigDialog.java index 0a60b02..6cbda40 100644 --- a/src/main/java/at/esque/kafka/dialogs/SubjectConfigDialog.java +++ b/src/main/java/at/esque/kafka/dialogs/SubjectConfigDialog.java @@ -27,8 +27,8 @@ public class SubjectConfigDialog { private SubjectConfigDialog() { } - private enum schemaCompatibilityLevel { - BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE; + private enum SchemaCompatibilityLevel { + BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE, UNDEFINED; } public static void show(RestService schemaRegistryRestService, String selectedSubject) { @@ -41,11 +41,11 @@ public static void show(RestService schemaRegistryRestService, String selectedSu globalCompatibilityLevel.set(globalConfig.getCompatibilityLevel()); // if configured also get the configured level on subject - SimpleObjectProperty subjectCompatibilityLevel = new SimpleObjectProperty<>(); + SimpleObjectProperty subjectCompatibilityLevel = new SimpleObjectProperty<>(); try { Config subjectConfig = schemaRegistryRestService.getConfig(selectedSubject); - schemaCompatibilityLevel configuredLevel = schemaCompatibilityLevel.valueOf(subjectConfig.getCompatibilityLevel()); + SchemaCompatibilityLevel configuredLevel = SchemaCompatibilityLevel.valueOf(subjectConfig.getCompatibilityLevel()); if (configuredLevel == null) { throw new IllegalArgumentException(String.format("Schema Registry returned an unknown compatibility level (%s)", subjectConfig.getCompatibilityLevel())); @@ -54,32 +54,33 @@ public static void show(RestService schemaRegistryRestService, String selectedSu subjectCompatibilityLevel.set(configuredLevel); } catch (RestClientException e) { //Nothing configured on subject level - if (e.getErrorCode() == 40401) { - subjectCompatibilityLevel.set(null); + // 40401 for compatibility with older versions + if (e.getErrorCode() == 40401 || e.getErrorCode() == 40408) { + subjectCompatibilityLevel.set(SchemaCompatibilityLevel.UNDEFINED); } else { throw e; } } - ListProperty schemaCompatibilityLevels = new SimpleListProperty<>(FXCollections.observableArrayList(schemaCompatibilityLevel.values())); + ListProperty schemaCompatibilityLevels = new SimpleListProperty<>(FXCollections.observableArrayList(SchemaCompatibilityLevel.values())); - SimpleObjectProperty existingSubjectCompatibilityLevel = new SimpleObjectProperty(subjectCompatibilityLevel.get()); + SimpleObjectProperty existingSubjectCompatibilityLevel = new SimpleObjectProperty(subjectCompatibilityLevel.get()); // Show dialog Form form = Form.of( - Group.of( - Field.ofStringType("Configured schema compatibility level:") - .editable(false), - Field.ofStringType(globalCompatibilityLevel.getValueSafe()) - .label("Global:") - .tooltip("Global Schema Compatibility Level") - .editable(false), - Field.ofSingleSelectionType(schemaCompatibilityLevels) - .label("Subject:") - .tooltip("Subject Schema Compatibility Leve") - .bind(schemaCompatibilityLevels, subjectCompatibilityLevel) - ) - ).title("Schema Compatibility") + Group.of( + Field.ofStringType("Configured schema compatibility level:") + .editable(false), + Field.ofStringType(globalCompatibilityLevel.getValueSafe()) + .label("Global:") + .tooltip("Global Schema Compatibility Level") + .editable(false), + Field.ofSingleSelectionType(schemaCompatibilityLevels) + .label("Subject:") + .tooltip("Subject Schema Compatibility Leve") + .bind(schemaCompatibilityLevels, subjectCompatibilityLevel) + ) + ).title("Schema Compatibility") .binding(BindingMode.CONTINUOUS); Dialog dialog = new Dialog<>(); @@ -106,8 +107,12 @@ public static void show(RestService schemaRegistryRestService, String selectedSu Optional newSubjectLevel = dialog.showAndWait(); if (newSubjectLevel.isPresent() && !newSubjectLevel.get().get().equals(existingSubjectCompatibilityLevel.getValue())) { - schemaCompatibilityLevel newLevel = (schemaCompatibilityLevel) newSubjectLevel.get().get(); - schemaRegistryRestService.updateCompatibility(newLevel.name(), selectedSubject); + SchemaCompatibilityLevel newLevel = (SchemaCompatibilityLevel) newSubjectLevel.get().get(); + if (newLevel.equals(SchemaCompatibilityLevel.UNDEFINED)) { + schemaRegistryRestService.deleteConfig(selectedSubject); + } else { + schemaRegistryRestService.updateCompatibility(newLevel.name(), selectedSubject); + } } } catch (Exception e) { diff --git a/src/main/java/at/esque/kafka/dialogs/TraceInputDialog.java b/src/main/java/at/esque/kafka/dialogs/TraceInputDialog.java index e80dc72..fcf979e 100644 --- a/src/main/java/at/esque/kafka/dialogs/TraceInputDialog.java +++ b/src/main/java/at/esque/kafka/dialogs/TraceInputDialog.java @@ -2,6 +2,7 @@ import at.esque.kafka.Main; import at.esque.kafka.topics.TraceDialogController; +import at.esque.kafka.topics.model.KafkaHeaderFilterOption; import javafx.collections.ObservableList; import javafx.fxml.FXMLLoader; import javafx.scene.Node; @@ -12,6 +13,7 @@ import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Optional; @@ -20,6 +22,9 @@ public class TraceInputDialog { public static final LinkedList recentTrace = new LinkedList<>(); + private TraceInputDialog() { + } + public static Optional show(boolean isAvroKeyType, boolean traceQuickSelectEnabled, List durations, int recentTraceMaxEntries, ObservableList partitions) throws IOException { Dialog dialog = new Dialog<>(); dialog.setResizable(true); @@ -32,7 +37,7 @@ public static Optional show(boolean isAvroKeyType, boolean traceQuic Parent root1 = fxmlLoader.load(); TraceDialogController controller = fxmlLoader.getController(); - controller.setup(isAvroKeyType, traceQuickSelectEnabled, durations, recentTraceMaxEntries, partitions); + controller.setup(isAvroKeyType, traceQuickSelectEnabled, durations, partitions); dialog.getDialogPane().getButtonTypes().addAll(ButtonType.OK, ButtonType.CANCEL); @@ -53,7 +58,7 @@ public static Optional show(boolean isAvroKeyType, boolean traceQuic updateRecentTrace(controller.keyTextBox.getText(), recentTraceMaxEntries); updateRecentTrace(controller.valueTextBox.getText(), recentTraceMaxEntries); } - return new TraceInput(controller.keyTextBox.getText(), controller.valueTextBox.getText(), keyMode, conditionMode, controller.fastTraceToggle.isSelected(), controller.tombstoneToggle.isSelected(), controller.epochInstantPicker.getInstantValue() == null ? null : controller.epochInstantPicker.getInstantValue().toEpochMilli(), controller.specificParitionComboBox.getValue()); + return new TraceInput(controller.keyTextBox.getText(), controller.valueTextBox.getText(), keyMode, conditionMode, controller.fastTraceToggle.isSelected(), controller.tombstoneToggle.isSelected(), controller.epochStartInstantPicker.getInstantValue() == null ? null : controller.epochStartInstantPicker.getInstantValue().toEpochMilli(), controller.epochEndInstantPicker.getInstantValue() == null ? null : controller.epochEndInstantPicker.getInstantValue().toEpochMilli(), controller.specificParitionComboBox.getValue(), controller.headerTableView.getItems()); } return null; }); @@ -78,18 +83,23 @@ public static class TraceInput { private String conditionMode; private boolean fastTrace; private boolean searchNull; - private Long epoch; + private Long epochStart; + private Long epochEnd; private Integer partition; - public TraceInput(String keySearch, String valueSearch, String keyMode, String conditionMode, boolean fastTrace, boolean searchNull, Long epoch, Integer partition) { + private List kafkaHeaderFilterOptions = new ArrayList<>(); + + public TraceInput(String keySearch, String valueSearch, String keyMode, String conditionMode, boolean fastTrace, boolean searchNull, Long epochStart, Long epochEnd, Integer partition, List kafkaHeaderFilterOptions) { this.keySearch = keySearch; this.valueSearch = valueSearch; this.keyMode = keyMode; this.conditionMode = conditionMode; this.fastTrace = fastTrace; this.searchNull = searchNull; - this.epoch = epoch; + this.epochStart = epochStart; + this.epochEnd = epochEnd; this.partition = partition; + this.kafkaHeaderFilterOptions = kafkaHeaderFilterOptions; } public String getKeySearch() { @@ -140,12 +150,20 @@ public void setSearchNull(boolean searchNull) { this.searchNull = searchNull; } - public Long getEpoch() { - return epoch; + public Long getEpochStart() { + return epochStart; } - public void setEpoch(Long epoch) { - this.epoch = epoch; + public void setEpochStart(Long epochStart) { + this.epochStart = epochStart; + } + + public Long getEpochEnd() { + return epochEnd; + } + + public void setEpochEnd(Long epochEnd) { + this.epochEnd = epochEnd; } public Integer getPartition() { @@ -155,5 +173,13 @@ public Integer getPartition() { public void setPartition(Integer partition) { this.partition = partition; } + + public List getKafkaHeaderFilterOptions() { + return kafkaHeaderFilterOptions; + } + + public void setKafkaHeaderFilterOptions(List kafkaHeaderFilterOptions) { + this.kafkaHeaderFilterOptions = kafkaHeaderFilterOptions; + } } } diff --git a/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java b/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java index d1b13a5..4f29c52 100644 --- a/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java +++ b/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java @@ -55,15 +55,17 @@ public void setRegisteredConsumers(Map registeredConsumers) public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicMessageTypeConfig, Map consumerConfigs) throws MissingSchemaRegistryException { Properties consumerProps = new Properties(); consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootStrapServers()); + String groupIdPrefix = Optional.ofNullable(consumerConfigs.get("group.id.prefix")) + .orElse("kafkaesque-"); UUID consumerId = UUID.randomUUID(); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafkaesque-" + consumerId); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupIdPrefix + consumerId); consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEsqueDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEsqueDeserializer.class); consumerProps.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, configHandler.getSettingsProperties().get(Settings.ENABLE_AVRO_LOGICAL_TYPE_CONVERSIONS)); consumerProps.setProperty("kafkaesque.cluster.id", config.getIdentifier()); consumerProps.put("kafkaesque.confighandler", configHandler); - if (topicMessageTypeConfig.containsAvro() && StringUtils.isEmpty(config.getSchemaRegistry())) { + if (topicMessageTypeConfig.requiresSchemaRegistry() && StringUtils.isEmpty(config.getSchemaRegistry())) { Optional input = SystemUtils.showInputDialog("http://localhost:8081", "Add schema-registry url", "this cluster config is missing a schema registry url please add it now", "schema-registry URL"); if (!input.isPresent()) { throw new MissingSchemaRegistryException(config.getIdentifier()); diff --git a/src/main/java/at/esque/kafka/handlers/ProducerHandler.java b/src/main/java/at/esque/kafka/handlers/ProducerHandler.java index 5432ae6..3186e7a 100644 --- a/src/main/java/at/esque/kafka/handlers/ProducerHandler.java +++ b/src/main/java/at/esque/kafka/handlers/ProducerHandler.java @@ -8,9 +8,12 @@ import at.esque.kafka.serialization.KafkaEsqueSerializer; import com.google.inject.Inject; import com.google.inject.Singleton; +import com.google.protobuf.Message; import io.confluent.kafka.schemaregistry.client.rest.RestService; import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; import org.apache.avro.generic.GenericDatumReader; @@ -150,6 +153,8 @@ private Object getMessageValue(String topic, String key, ProducerWrapper produce return createRecord(producerWrapper, key, topic, b); } else if (MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(type)) { return createRecord(producerWrapper, key, topic, recordType); + } else if (MessageType.PROTOBUF_SR.equals(type)) { + return createProtobufMessage(producerWrapper, key, topic, b); } return key; } @@ -181,6 +186,15 @@ private GenericRecord createRecord(ProducerWrapper producerWrapper, String json, return createGenericRecord(json, schema); } + private Message createProtobufMessage(ProducerWrapper producerWrapper, String json, String topic, boolean isKey) throws IOException, RestClientException { + if (json == null) { + return null; + } + Schema schema = getSchemaFromRegistry(producerWrapper.getSchemaRegistryRestService(), topic + (isKey ? "-key" : "-value")); + + return (Message) ProtobufSchemaUtils.toObject(json, new ProtobufSchema(schema.getSchema())); + } + private GenericRecord createGenericRecord(String json, Schema schema) throws IOException { org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema.getSchema()); diff --git a/src/main/java/at/esque/kafka/serialization/Base64Deserializer.java b/src/main/java/at/esque/kafka/serialization/Base64Deserializer.java new file mode 100644 index 0000000..40fab82 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/Base64Deserializer.java @@ -0,0 +1,18 @@ +package at.esque.kafka.serialization; + +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Base64; + +public class Base64Deserializer extends StringDeserializer { + + @Override + public String deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } else { + return new String(Base64.getEncoder().encode(data)); + } + } +} + diff --git a/src/main/java/at/esque/kafka/serialization/Base64Serializer.java b/src/main/java/at/esque/kafka/serialization/Base64Serializer.java new file mode 100644 index 0000000..3e2def2 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/Base64Serializer.java @@ -0,0 +1,17 @@ +package at.esque.kafka.serialization; + +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Base64; + +public class Base64Serializer extends StringSerializer { + + @Override + public byte[] serialize(String topic, String data) { + if (data == null) { + return new byte[0]; + } else { + return Base64.getDecoder().decode(data); + } + } +} diff --git a/src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java b/src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java new file mode 100644 index 0000000..9015b4d --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java @@ -0,0 +1,33 @@ +package at.esque.kafka.serialization; + +import com.google.protobuf.Message; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class ForgivingProtobufAvroDeserializer implements Deserializer { + + private KafkaProtobufDeserializer baseDeserializer = new KafkaProtobufDeserializer<>(); + private static final Logger logger = LoggerFactory.getLogger(ForgivingProtobufAvroDeserializer.class); + + public Object deserialize(byte[] bytes) { + throw new UnsupportedOperationException(); + } + + public Object deserialize(String s, byte[] bytes) { + try { + return baseDeserializer.deserialize(s, bytes); + } catch (Exception e) { + logger.warn("Failed Protobuf deserialization in topic {}", s); + } + return "Failed Protobuf deserialization! Content as String: " + new String(bytes); + } + + @Override + public void configure(Map configs, boolean isKey) { + baseDeserializer.configure(configs, isKey); + } +} diff --git a/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java b/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java index db3446d..4e63e79 100644 --- a/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java +++ b/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java @@ -3,7 +3,6 @@ import at.esque.kafka.MessageType; import at.esque.kafka.cluster.TopicMessageTypeConfig; import at.esque.kafka.handlers.ConfigHandler; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; @@ -37,7 +36,7 @@ public Object deserialize(String s, byte[] bytes) { Integer schemaId = null; - if (MessageType.AVRO.equals(messageType) || MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(messageType)) { + if (isSchemaRegistryMessageType(messageType)) { schemaId = getSchemaId(bytes); } if ((deserializedObj instanceof GenericData.Record)) { @@ -49,13 +48,19 @@ public Object deserialize(String s, byte[] bytes) { return deserializedObj; } + private static boolean isSchemaRegistryMessageType(MessageType messageType) { + return MessageType.AVRO.equals(messageType) + || MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(messageType) + || MessageType.PROTOBUF_SR.equals(messageType); + } + public void configure(Map configs, boolean isKey) { this.isKey = isKey; deserializerMap.values().forEach(deserializer -> { - if (deserializer instanceof ForgivingKafkaAvroDeserializer && configs.get("schema.registry.url") == null) { + if ((deserializer instanceof ForgivingKafkaAvroDeserializer || deserializer instanceof ForgivingProtobufAvroDeserializer) && configs.get("schema.registry.url") == null) { //Don't call configure for the AvroDeserializer if there is no schema registry url to prevent exception, in cases where avro is not even used - }else { + } else { deserializer.configure(configs, isKey); } }); @@ -63,10 +68,6 @@ public void configure(Map configs, boolean isKey) { this.configHandler = (ConfigHandler) configs.get("kafkaesque.confighandler"); } - public Object deserialize(String s, byte[] bytes, Schema readerSchema) { - return this.deserialize(s, bytes); - } - public void close() { } @@ -90,8 +91,12 @@ private Deserializer deserializerByType(MessageType type) { return Serdes.ByteBuffer().deserializer(); case BYTES: return Serdes.Bytes().deserializer(); + case BASE64: + return new Base64Deserializer(); case UUID: return Serdes.UUID().deserializer(); + case PROTOBUF_SR: + return new ForgivingProtobufAvroDeserializer(); case AVRO: case AVRO_TOPIC_RECORD_NAME_STRATEGY: return new ForgivingKafkaAvroDeserializer(); diff --git a/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java b/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java index de926d2..ae0a80e 100644 --- a/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java +++ b/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java @@ -1,9 +1,12 @@ package at.esque.kafka.serialization; +import at.esque.kafka.JsonUtils; import at.esque.kafka.MessageType; import at.esque.kafka.cluster.TopicMessageTypeConfig; import at.esque.kafka.handlers.ConfigHandler; +import com.google.protobuf.Message; import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; @@ -18,7 +21,8 @@ public class KafkaEsqueSerializer implements Serializer { - private KafkaAvroSerializer avroSerialier = new KafkaAvroSerializer(); + private KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(); + private KafkaProtobufSerializer protobufSerializer = new KafkaProtobufSerializer(); private static final List AVRO_TYPES = Arrays.asList(MessageType.AVRO, MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY); @@ -37,7 +41,9 @@ public Object serialize(byte[] bytes) { public byte[] serialize(String s, Object object) { TopicMessageTypeConfig topicConfig = configHandler.getConfigForTopic(clusterId, s); if (isKey ? AVRO_TYPES.contains(topicConfig.getKeyType()) : AVRO_TYPES.contains(topicConfig.getValueType())) { - return avroSerialier.serialize(s, object); + return avroSerializer.serialize(s, object); + } else if ((isKey ? MessageType.PROTOBUF_SR.equals(topicConfig.getKeyType()) : MessageType.PROTOBUF_SR.equals(topicConfig.getValueType())) && object instanceof Message message) { + return protobufSerializer.serialize(s, message); } else { SerializerWrapper serializerWrapper = serializerMap.get(isKey ? topicConfig.getKeyType() : topicConfig.getValueType()); return serializerWrapper.serializer.serialize(s, serializerWrapper.function.apply(object)); @@ -49,7 +55,8 @@ public void configure(Map configs, boolean isKey) { this.isKey = isKey; serializerMap.values().forEach(serializer -> serializer.serializer.configure(configs, isKey)); if (configs.get("schema.registry.url") != null) { - avroSerialier.configure(configs, isKey); + avroSerializer.configure(configs, isKey); + protobufSerializer.configure(configs, isKey); } this.clusterId = (String) configs.get("kafkaesque.cluster.id"); this.configHandler = (ConfigHandler) configs.get("kafkaesque.confighandler"); @@ -61,7 +68,7 @@ public void close() { private SerializerWrapper serializerByType(MessageType type) { switch (type) { case STRING: - return new SerializerWrapper(s->s, Serdes.String().serializer()); + return new SerializerWrapper(s -> s, Serdes.String().serializer()); case SHORT: return new SerializerWrapper(Short::parseShort, Serdes.Short().serializer()); case INTEGER: @@ -75,16 +82,21 @@ private SerializerWrapper serializerByType(MessageType type) { case BYTEARRAY: return new SerializerWrapper(String::getBytes, Serdes.ByteArray().serializer()); case BYTEBUFFER: - return new SerializerWrapper( s -> ByteBuffer.wrap(s.getBytes()), Serdes.ByteBuffer().serializer()); + return new SerializerWrapper(s -> ByteBuffer.wrap(s.getBytes()), Serdes.ByteBuffer().serializer()); case BYTES: return new SerializerWrapper(s -> Bytes.wrap(s.getBytes()), Serdes.Bytes().serializer()); + case BASE64: + return new SerializerWrapper(s -> s, new Base64Serializer()); case UUID: return new SerializerWrapper(UUID::fromString, Serdes.UUID().serializer()); + case PROTOBUF_SR: + return new SerializerWrapper(JsonUtils::fromJson, protobufSerializer); default: throw new UnsupportedOperationException("no serializer for Message type: " + type); } } - public class SerializerWrapper{ + + public class SerializerWrapper { private Function function; private Serializer serializer; diff --git a/src/main/java/at/esque/kafka/serialization/jackson/HeaderObservableListConverter.java b/src/main/java/at/esque/kafka/serialization/jackson/HeaderObservableListConverter.java new file mode 100644 index 0000000..49f4732 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/jackson/HeaderObservableListConverter.java @@ -0,0 +1,29 @@ +package at.esque.kafka.serialization.jackson; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.databind.util.Converter; +import javafx.collections.FXCollections; +import javafx.collections.ObservableList; +import org.apache.kafka.common.header.Header; + +import java.util.List; + +public class HeaderObservableListConverter implements Converter, ObservableList
> { + + + @Override + public ObservableList
convert(List
headers) { + return FXCollections.observableArrayList(headers); + } + + @Override + public JavaType getInputType(TypeFactory typeFactory) { + return typeFactory.constructCollectionType(List.class, Header.class); + } + + @Override + public JavaType getOutputType(TypeFactory typeFactory) { + return typeFactory.constructCollectionType(ObservableList.class, Header.class); + } +} diff --git a/src/main/java/at/esque/kafka/serialization/jackson/KafkaHeaderDeserializer.java b/src/main/java/at/esque/kafka/serialization/jackson/KafkaHeaderDeserializer.java new file mode 100644 index 0000000..cebf1eb --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/jackson/KafkaHeaderDeserializer.java @@ -0,0 +1,35 @@ +package at.esque.kafka.serialization.jackson; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class KafkaHeaderDeserializer extends StdDeserializer
{ + + + public KafkaHeaderDeserializer() { + this(null); + } + + public KafkaHeaderDeserializer(Class clazz) { + super(clazz); + } + + @Override + public Header deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JacksonException { + JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser); + String key = jsonNode.get("key").asText(); + String value = jsonNode.get("value").asText(); + + return new RecordHeader(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8)); + } + + +} diff --git a/src/main/java/at/esque/kafka/serialization/jackson/KafkaHeaderSerializer.java b/src/main/java/at/esque/kafka/serialization/jackson/KafkaHeaderSerializer.java new file mode 100644 index 0000000..74b8086 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/jackson/KafkaHeaderSerializer.java @@ -0,0 +1,28 @@ +package at.esque.kafka.serialization.jackson; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.kafka.common.header.Header; + +import java.io.IOException; + +public class KafkaHeaderSerializer extends StdSerializer
{ + + + public KafkaHeaderSerializer() { + this(null); + } + + public KafkaHeaderSerializer(Class
clazz) { + super(clazz); + } + + @Override + public void serialize(Header header, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField("key", header.key()); + jsonGenerator.writeStringField("value", header.value() == null ? null : new String(header.value())); + jsonGenerator.writeEndObject(); + } +} diff --git a/src/main/java/at/esque/kafka/serialization/jackson/MessageMetaDataDeserializer.java b/src/main/java/at/esque/kafka/serialization/jackson/MessageMetaDataDeserializer.java new file mode 100644 index 0000000..1cc4218 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/jackson/MessageMetaDataDeserializer.java @@ -0,0 +1,35 @@ +package at.esque.kafka.serialization.jackson; + +import at.esque.kafka.topics.metadata.MessageMetaData; +import at.esque.kafka.topics.metadata.NumericMetadata; +import at.esque.kafka.topics.metadata.StringMetadata; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +public class MessageMetaDataDeserializer extends StdDeserializer { + + public MessageMetaDataDeserializer() { + this(null); + } + + public MessageMetaDataDeserializer(Class clazz) { + super(clazz); + } + + @Override + public MessageMetaData deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { + JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser); + String name = jsonNode.get("name").asText(); + JsonNode value = jsonNode.get("value"); + + if (value.isNumber()) { + return new NumericMetadata(name, value.asLong()); + } else { + return new StringMetadata(name, value.asText()); + } + } +} diff --git a/src/main/java/at/esque/kafka/serialization/jackson/MessageMetaDataObservableListConverter.java b/src/main/java/at/esque/kafka/serialization/jackson/MessageMetaDataObservableListConverter.java new file mode 100644 index 0000000..d0f08da --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/jackson/MessageMetaDataObservableListConverter.java @@ -0,0 +1,29 @@ +package at.esque.kafka.serialization.jackson; + +import at.esque.kafka.topics.metadata.MessageMetaData; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.databind.util.Converter; +import javafx.collections.FXCollections; +import javafx.collections.ObservableList; + +import java.util.List; + +public class MessageMetaDataObservableListConverter implements Converter, ObservableList> { + + + @Override + public ObservableList convert(List headers) { + return FXCollections.observableArrayList(headers); + } + + @Override + public JavaType getInputType(TypeFactory typeFactory) { + return typeFactory.constructCollectionType(List.class, MessageMetaData.class); + } + + @Override + public JavaType getOutputType(TypeFactory typeFactory) { + return typeFactory.constructCollectionType(ObservableList.class, MessageMetaData.class); + } +} diff --git a/src/main/java/at/esque/kafka/topics/KafkaMessage.java b/src/main/java/at/esque/kafka/topics/KafkaMessage.java index 1f08091..0e6eae4 100644 --- a/src/main/java/at/esque/kafka/topics/KafkaMessage.java +++ b/src/main/java/at/esque/kafka/topics/KafkaMessage.java @@ -1,6 +1,9 @@ package at.esque.kafka.topics; +import at.esque.kafka.serialization.jackson.HeaderObservableListConverter; +import at.esque.kafka.serialization.jackson.MessageMetaDataObservableListConverter; import at.esque.kafka.topics.metadata.MessageMetaData; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import javafx.beans.property.IntegerProperty; import javafx.beans.property.ListProperty; import javafx.beans.property.LongProperty; @@ -20,9 +23,11 @@ public class KafkaMessage { private StringProperty key = new SimpleStringProperty(); private StringProperty value = new SimpleStringProperty(); private StringProperty timestamp = new SimpleStringProperty(); + @JsonDeserialize(converter = HeaderObservableListConverter.class) private ListProperty
headers = new SimpleListProperty<>(); private StringProperty keyType = new SimpleStringProperty(); private StringProperty valueType = new SimpleStringProperty(); + @JsonDeserialize(converter = MessageMetaDataObservableListConverter.class) private ObservableList metaData = FXCollections.observableArrayList(); public long getOffset() { diff --git a/src/main/java/at/esque/kafka/topics/TraceDialogController.java b/src/main/java/at/esque/kafka/topics/TraceDialogController.java index d4ff4fa..c732ce8 100644 --- a/src/main/java/at/esque/kafka/topics/TraceDialogController.java +++ b/src/main/java/at/esque/kafka/topics/TraceDialogController.java @@ -2,9 +2,14 @@ import at.esque.kafka.alerts.ErrorAlert; import at.esque.kafka.controls.InstantPicker; +import at.esque.kafka.topics.model.KafkaHeaderFilterOption; import javafx.beans.binding.Bindings; +import javafx.beans.property.SimpleBooleanProperty; +import javafx.beans.property.SimpleStringProperty; +import javafx.beans.value.ObservableValue; import javafx.collections.FXCollections; import javafx.collections.ObservableList; +import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.Button; import javafx.scene.control.CheckBox; @@ -12,12 +17,17 @@ import javafx.scene.control.ListCell; import javafx.scene.control.ListView; import javafx.scene.control.RadioButton; +import javafx.scene.control.TableColumn; +import javafx.scene.control.TableView; import javafx.scene.control.TextField; import javafx.scene.control.TitledPane; import javafx.scene.control.ToggleButton; import javafx.scene.control.ToggleGroup; +import javafx.scene.control.cell.CheckBoxTableCell; +import javafx.scene.control.cell.TextFieldTableCell; import javafx.scene.layout.Background; import javafx.scene.layout.HBox; +import javafx.util.Callback; import org.controlsfx.control.PopOver; import org.jetbrains.annotations.NotNull; @@ -34,8 +44,10 @@ public class TraceDialogController { - public HBox quickSelectButtonBar; - public InstantPicker epochInstantPicker; + public HBox quickSelectStartEpochButtonBar; + public InstantPicker epochStartInstantPicker; + public HBox quickSelectEndEpochButtonBar; + public InstantPicker epochEndInstantPicker; public ToggleButton epochToggleButton; public RadioButton traceModeKeyOnlyRadio; public ToggleGroup conditionMode; @@ -54,26 +66,28 @@ public class TraceDialogController { public TitledPane keyOptionsPane; public TitledPane valueOptionsPane; public ComboBox specificParitionComboBox; - - public TraceDialogController() { - } + public TableView headerTableView; + public TableColumn headerFilterHeaderColumn; + public TableColumn headerFilterFilterStringColumn; + public TableColumn headerFilterExactMatchColumn; @FXML public void initialize() { keyOptionsPane.disableProperty().bind(traceModeValueRadio.selectedProperty()); valueOptionsPane.disableProperty().bind(traceModeKeyOnlyRadio.selectedProperty()); - epochInstantPicker.displayAsEpochProperty().bind(epochToggleButton.selectedProperty()); + epochStartInstantPicker.displayAsEpochProperty().bind(epochToggleButton.selectedProperty()); } - public void setup(boolean isAvroKeyType, boolean traceQuickSelectEnabled, List durations, int recentTraceMaxEntries, ObservableList partitions) { + public void setup(boolean isAvroKeyType, boolean traceQuickSelectEnabled, List durations, ObservableList partitions) { clearButtonBar(); if (traceQuickSelectEnabled) { - fillButtonBar(durations); + fillButtonBar(durations, quickSelectStartEpochButtonBar, epochStartInstantPicker); + fillButtonBar(durations, quickSelectEndEpochButtonBar, epochEndInstantPicker); } - if(!isAvroKeyType){ + if (!isAvroKeyType) { fastTraceToggle.setDisable(false); fastTraceToggle.disableProperty().bind(Bindings.or(keyModeRegexRadio.selectedProperty(), traceModeOrRadio.selectedProperty())); - }else{ + } else { fastTraceToggle.setDisable(true); } specificParitionComboBox.setItems(partitions); @@ -85,30 +99,54 @@ public void setup(boolean isAvroKeyType, boolean traceQuickSelectEnabled, List popOverKey.show(keyHistoryButton)); valueHistoryButton.setOnAction(event -> popOverValue.show(valueHistoryButton)); + + headerFilterHeaderColumn.setCellFactory(TextFieldTableCell.forTableColumn()); + headerFilterFilterStringColumn.setCellFactory(TextFieldTableCell.forTableColumn()); + headerFilterExactMatchColumn.setCellFactory(CheckBoxTableCell.forTableColumn(new Callback>() { + @Override + public ObservableValue call(Integer param) { + KafkaHeaderFilterOption kafkaHeaderFilterOption = headerTableView.getItems().get(param); + SimpleBooleanProperty simpleBooleanProperty = new SimpleBooleanProperty(kafkaHeaderFilterOption.isExactMatch()); + simpleBooleanProperty.addListener((observable, oldValue, newValue) -> kafkaHeaderFilterOption.setExactMatch(newValue)) + ; + return simpleBooleanProperty; + } + }, false)); + headerFilterHeaderColumn.setCellValueFactory(param -> new SimpleStringProperty(param.getValue().getHeader())); + headerFilterFilterStringColumn.setCellValueFactory(param -> new SimpleStringProperty(param.getValue().getFilterString())); + headerFilterExactMatchColumn.setCellValueFactory(param -> new SimpleBooleanProperty(param.getValue().isExactMatch())); + headerFilterHeaderColumn.setOnEditCommit(event -> { + KafkaHeaderFilterOption current = event.getTableView().getItems().get(event.getTablePosition().getRow()); + current.setHeader(event.getNewValue()); + }); + headerFilterFilterStringColumn.setOnEditCommit(event -> { + KafkaHeaderFilterOption current = event.getTableView().getItems().get(event.getTablePosition().getRow()); + current.setFilterString(event.getNewValue()); + }); } private void clearButtonBar() { - quickSelectButtonBar.getChildren().clear(); + quickSelectStartEpochButtonBar.getChildren().clear(); } - private void fillButtonBar(List durations) { + private void fillButtonBar(List durations, HBox buttonBar, InstantPicker targetInstantPicker) { Button todayButton = new Button("Today"); todayButton.setOnAction(event -> { OffsetDateTime offsetDateTime = Instant.now().atOffset(ZoneOffset.UTC); Instant today = OffsetDateTime.of(offsetDateTime.toLocalDate(), LocalTime.of(0, 0, 0, 0), ZoneOffset.UTC).toInstant(); - epochInstantPicker.setInstantValue(today); + targetInstantPicker.setInstantValue(today); }); - quickSelectButtonBar.getChildren().add(todayButton); + buttonBar.getChildren().add(todayButton); durations.forEach(duration -> { Button button = new Button("Now - " + stringifyDuration(duration)); button.setOnAction(event -> { try { - epochInstantPicker.setInstantValue(Instant.now().minus(duration)); + targetInstantPicker.setInstantValue(Instant.now().minus(duration)); } catch (Exception e) { ErrorAlert.show(e); } }); - quickSelectButtonBar.getChildren().add(button); + buttonBar.getChildren().add(button); }); } @@ -173,4 +211,17 @@ private static ListView buildRecentTracesView(TextField key) { recentTraces.setItems(FXCollections.observableArrayList(sortable)); return recentTraces; } + + @FXML + private void addHeaderClick(ActionEvent event) { + headerTableView.getItems().add(new KafkaHeaderFilterOption("header", "regex or exact string to match", false)); + } + + @FXML + private void removeHeaderClick(ActionEvent e) { + int selectedIndex = headerTableView.getSelectionModel().getSelectedIndex(); + if (selectedIndex > -1) { + headerTableView.getItems().remove(selectedIndex); + } + } } diff --git a/src/main/java/at/esque/kafka/topics/model/KafkaHeaderFilterOption.java b/src/main/java/at/esque/kafka/topics/model/KafkaHeaderFilterOption.java new file mode 100644 index 0000000..cff7028 --- /dev/null +++ b/src/main/java/at/esque/kafka/topics/model/KafkaHeaderFilterOption.java @@ -0,0 +1,63 @@ +package at.esque.kafka.topics.model; + +import java.util.Objects; + +public class KafkaHeaderFilterOption { + private String header; + private String filterString; + private boolean exactMatch; + + public KafkaHeaderFilterOption(String header, String filterString, boolean exactMatch) { + this.header = header; + this.filterString = filterString; + this.exactMatch = exactMatch; + } + + public String getHeader() { + return header; + } + + public void setHeader(String header) { + this.header = header; + } + + public String getFilterString() { + return filterString; + } + + public void setFilterString(String filterString) { + this.filterString = filterString; + } + + public boolean isExactMatch() { + return exactMatch; + } + + public void setExactMatch(boolean exactMatch) { + this.exactMatch = exactMatch; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (KafkaHeaderFilterOption) obj; + return Objects.equals(this.header, that.header) && + Objects.equals(this.filterString, that.filterString) && + this.exactMatch == that.exactMatch; + } + + @Override + public int hashCode() { + return Objects.hash(header, filterString, exactMatch); + } + + @Override + public String toString() { + return "KafkaHeaderFilterOption[" + + "header=" + header + ", " + + "filterString=" + filterString + ", " + + "exactMatch=" + exactMatch + ']'; + } + +} diff --git a/src/main/resources/fxml/controls/messagesTab.fxml b/src/main/resources/fxml/controls/messagesTab.fxml index 4cf3e5a..672350f 100644 --- a/src/main/resources/fxml/controls/messagesTab.fxml +++ b/src/main/resources/fxml/controls/messagesTab.fxml @@ -2,37 +2,47 @@ + + - + - + - - - - - - - + + + + + diff --git a/src/main/resources/fxml/createSchema.fxml b/src/main/resources/fxml/createSchema.fxml index 7ba853b..10c9b58 100644 --- a/src/main/resources/fxml/createSchema.fxml +++ b/src/main/resources/fxml/createSchema.fxml @@ -1,9 +1,10 @@ + + - @@ -14,7 +15,6 @@ - @@ -30,6 +30,7 @@ + @@ -63,7 +64,7 @@ + maxWidth="1.7976931348623157E308" HBox.hgrow="ALWAYS"/>