Skip to content

Commit

Permalink
Merge pull request #43 from daengi/develop
Browse files Browse the repository at this point in the history
Enhancing KafkaEsque with protobuf functionalities
  • Loading branch information
patschuh committed Oct 3, 2023
2 parents 8e45971 + 07258ea commit 626f801
Show file tree
Hide file tree
Showing 17 changed files with 207 additions and 44 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group = 'at.esque.kafka'
version = '2.6.0'
version = '2.7.0'

repositories {
mavenCentral()
Expand Down Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation 'com.opencsv:opencsv:5.5.2'
implementation 'io.confluent:kafka-schema-registry:7.3.0'
implementation 'io.confluent:kafka-avro-serializer:6.2.7'
implementation 'io.confluent:kafka-protobuf-serializer:6.2.7'
implementation 'org.apache.avro:avro:1.11.1'
implementation 'com.google.inject:guice:5.0.1'
implementation 'org.fxmisc.richtext:richtextfx:0.10.7'
Expand Down
26 changes: 23 additions & 3 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -41,6 +41,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.protobuf.Message;
import com.opencsv.bean.CsvToBeanBuilder;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import javafx.application.HostServices;
Expand Down Expand Up @@ -136,6 +137,7 @@
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


public class Controller {
Expand Down Expand Up @@ -468,7 +470,7 @@ private ListCell<String> topicListCellFactory() {
try {
TopicMessageTypeConfig topicMessageTypeConfig = configHandler.getConfigForTopic(selectedCluster().getIdentifier(), selectedTopic());
Map<String, String> consumerConfig = configHandler.readConsumerConfigs(selectedCluster().getIdentifier());
TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems())
TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO || topicMessageTypeConfig.getKeyType() == MessageType.PROTOBUF_SR, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems())
.ifPresent(traceInput -> {
backGroundTaskHolder.setBackGroundTaskDescription("tracing message");
Integer partition = null;
Expand Down Expand Up @@ -565,7 +567,8 @@ private void getTopicsForCluster() {
backGroundTaskHolder.setIsInProgress(true);
Set<String> topics = adminClient.getTopics();
Platform.runLater(() -> topicListView.setItems(topics));

} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
} finally {
stopWatch.stop();
LOGGER.info("Finished getting topics for cluster [{}]", stopWatch);
Expand Down Expand Up @@ -908,7 +911,10 @@ private void subscribeOrAssignToSelectedPartition(TopicMessageTypeConfig topic,
if (selectedPartition >= 0) {
consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(Collections.singletonList(new TopicPartition(selectedTopic(), selectedPartition))));
} else {
consumerHandler.subscribe(consumerId, topic.getName());
List<TopicPartition> partitions = adminClient.getPatitions(topic.getName()).stream()
.map(integer -> new TopicPartition(topic.getName(), integer))
.collect(Collectors.toList());
consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions));
}
}

Expand Down Expand Up @@ -1035,9 +1041,23 @@ private <KT, VT> void convertAndAdd(ConsumerRecord<KT, VT> cr, ObservableList<Ka
if (cr.value() instanceof GenericData.Record recordValue) {
kafkaMessage.setValueType(extractTypeFromGenericRecord(recordValue));
kafkaMessage.getMetaData().add(new StringMetadata("Value Schema ID", extractSchemaIdFromGenericRecord(recordValue)));
} else if (cr.value() instanceof Message message) {
kafkaMessage.setValueType(message.getDescriptorForType().getFullName());
try {
kafkaMessage.setValue(JsonUtils.toJson(message));
} catch (IOException e) {
LOGGER.error("Failed printing protobuf message as json", e);
}
}
if (cr.key() instanceof GenericData.Record recordKey) {
kafkaMessage.setKeyType(extractTypeFromGenericRecord(recordKey));
} else if (cr.key() instanceof Message messageKey) {
kafkaMessage.setKeyType(messageKey.getDescriptorForType().getFullName());
try {
kafkaMessage.setKey(JsonUtils.toJson(messageKey));
} catch (IOException e) {
LOGGER.error("Failed printing protobuf message as json", e);
}
}

kafkaMessage.setTimestamp(Instant.ofEpochMilli(cr.timestamp()).toString());
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/at/esque/kafka/CreateSchemaController.java
Expand Up @@ -4,13 +4,14 @@
import at.esque.kafka.alerts.SuccessAlert;
import at.esque.kafka.controls.KafkaEsqueCodeArea;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import javafx.collections.FXCollections;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.ComboBox;
import javafx.scene.control.TextField;
import javafx.stage.FileChooser;
import javafx.stage.Stage;
import javafx.stage.Window;
import org.apache.avro.Schema;

import java.io.File;
import java.io.IOException;
Expand All @@ -24,17 +25,22 @@ public class CreateSchemaController {
private TextField subjectTextField;
@FXML
private KafkaEsqueCodeArea schemaTextArea;
@FXML
private ComboBox<String> schemaTypeComboBox;

private RestService restService;

private Stage stage;

public void addSchema(ActionEvent actionEvent) {

Schema.Parser parser = new Schema.Parser();
try {
parser.parse(schemaTextArea.getText());
restService.registerSchema(schemaTextArea.getText(), subjectTextField.getText());
restService.registerSchema(
schemaTextArea.getText(),
schemaTypeComboBox.getSelectionModel().getSelectedItem(),
null,
subjectTextField.getText());

SuccessAlert.show("Success", null, "Schema added successfully!", getWindow());
} catch (Exception e) {
ErrorAlert.show(e, getWindow());
Expand All @@ -61,6 +67,8 @@ private Window getWindow() {

public void setup(String selectedSubject, RestService restService, Stage stage) {
this.restService = restService;
this.schemaTypeComboBox.setItems(FXCollections.observableArrayList("AVRO", "PROTOBUF"));
this.schemaTypeComboBox.getSelectionModel().select(0);
this.stage = stage;
if (selectedSubject != null) {
subjectTextField.setText(selectedSubject);
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/at/esque/kafka/CrossClusterController.java
Expand Up @@ -37,19 +37,22 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.kordamp.ikonli.fontawesome.FontAwesome;
import org.kordamp.ikonli.javafx.FontIcon;

import java.io.IOException;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class CrossClusterController {

Expand Down Expand Up @@ -168,7 +171,10 @@ private void startOperation(UUID operationId) {
try {
producerId = producerHandler.registerProducer(operation.getToCluster(), operation.getToTopic().getName());
consumerId = consumerHandler.registerConsumer(operation.getFromCluster(), operation.getFromTopic(), configHandler.readConsumerConfigs(operation.getToCluster().getIdentifier()));
consumerHandler.subscribe(consumerId, operation.getFromTopic().getName());
List<TopicPartition> partitions = fromAdmin.getPatitions(operation.getFromTopic().getName()).stream()
.map(integer -> new TopicPartition(operation.getFromTopic().getName(), integer))
.collect(Collectors.toList());
consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions));
if (instantPicker.getInstantValue() != null) {
consumerHandler.seekToTime(consumerId, instantPicker.getInstantValue().toEpochMilli());
} else {
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/at/esque/kafka/JsonUtils.java
Expand Up @@ -13,6 +13,11 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.Struct;
import com.google.protobuf.util.JsonFormat;
import javafx.scene.control.TreeItem;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand Down Expand Up @@ -350,4 +355,21 @@ private static void applyCorrectAdder(JsonNode value, JsonTreeItem treeItem) {
recursivelyAddElements((String.valueOf(value)), treeItem);
}
}

public static Message fromJson(String json) {
Struct.Builder structBuilder = Struct.newBuilder();
try {
JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return structBuilder.build();
}

public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException {
if (messageOrBuilder == null) {
return null;
}
return JsonFormat.printer().print(messageOrBuilder);
}
}
3 changes: 2 additions & 1 deletion src/main/java/at/esque/kafka/MessageType.java
Expand Up @@ -12,5 +12,6 @@ public enum MessageType {
BYTEARRAY,
BYTEBUFFER,
BYTES,
UUID
UUID,
PROTOBUF_SR
}
Expand Up @@ -3,6 +3,7 @@
import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.controls.KafkaEsqueCodeArea;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.Label;
Expand All @@ -11,7 +12,6 @@
import javafx.stage.FileChooser;
import javafx.stage.Stage;
import javafx.stage.Window;
import org.apache.avro.Schema;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -40,11 +40,9 @@ public class SchemaCompatibilityCheckController {

public void addSchema(ActionEvent actionEvent) {

Schema.Parser parser = new Schema.Parser();
try {
parser.parse(schemaTextArea.getText());

List<String> compatibility = restService.testCompatibility(schemaTextArea.getText(), subjectTextField.getText(), versionTextField.getText());
Schema schema = restService.getVersion(subjectTextField.getText(), Integer.parseInt(versionTextField.getText()));
List<String> compatibility = restService.testCompatibility(schemaTextArea.getText(), schema.getSchemaType(), null, subjectTextField.getText(), versionTextField.getText(), false);

if (compatibility.isEmpty()) {
resultLabel.setTextFill(Color.web("#3d7a3d"));
Expand Down
Expand Up @@ -55,7 +55,9 @@ public class SchemaRegistryBrowserController {
@FXML
private Label schemaIdLabel;
@FXML
public Label compatibilityLabel;
private Label compatibilityLabel;
@FXML
private Label typeLabel;


public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) {
Expand All @@ -78,7 +80,8 @@ public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) {
try {
Schema schema = schemaRegistryRestService.getVersion(subjectListView.getListView().getSelectionModel().getSelectedItem(), newValue1);
schemaIdLabel.setText(String.valueOf(schema.getId()));
schemaTextArea.setText(JsonUtils.formatJson(schema.getSchema()));
schemaTextArea.setText("AVRO".equals(schema.getSchemaType()) ? JsonUtils.formatJson(schema.getSchema()) : schema.getSchema());
typeLabel.setText(schema.getSchemaType());
} catch (Exception e) {
ErrorAlert.show(e, getWindow());
}
Expand Down
46 changes: 39 additions & 7 deletions src/main/java/at/esque/kafka/TraceUtils.java
@@ -1,12 +1,14 @@
package at.esque.kafka;

import at.esque.kafka.topics.model.KafkaHeaderFilterOption;
import com.google.protobuf.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Spliterator;
import java.util.Spliterators;
Expand All @@ -27,29 +29,59 @@ public static Predicate<ConsumerRecord> valuePredicate(String regex, boolean sea
if (searchNull) {
return ((cr) -> cr.value() == null);
} else {
Pattern pattern = Pattern.compile(regex);
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
return (cr) -> {
if (cr.value() == null) {
return false;
}
Matcher matcher = pattern.matcher(cr.value().toString());
return matcher.find();
if (cr.value() instanceof Message messageValue) {
Matcher matcher = null;
try {
matcher = pattern.matcher(JsonUtils.toJson(messageValue));
} catch (IOException e) {
throw new RuntimeException(e);
}
return matcher.find();
} else {
Matcher matcher = pattern.matcher(cr.value().toString());
return matcher.find();
}
};

}
}

public static Predicate<ConsumerRecord> keyPredicate(String search, @NotNull String keyMode) {
if (keyMode.equals("exact match")) {
return ((cr) -> StringUtils.equals(cr.key().toString(), search));
return ((cr) -> {
if (cr.key() instanceof Message messageKey) {
try {
return StringUtils.equals(JsonUtils.toJson(messageKey), search);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return StringUtils.equals(cr.key().toString(), search);
}
});
} else/*(keyMode.equals("regex (contains)"))*/ {
Pattern pattern = Pattern.compile(search);
Pattern pattern = Pattern.compile(search, Pattern.DOTALL);
return (cr) -> {
if (cr.key() == null) {
return false;
}
Matcher matcher = pattern.matcher(cr.key().toString());
return matcher.find();
if (cr.key() instanceof Message messageKey) {
Matcher matcher = null;
try {
matcher = pattern.matcher(JsonUtils.toJson(messageKey));
} catch (IOException e) {
throw new RuntimeException(e);
}
return matcher.find();
} else {
Matcher matcher = pattern.matcher(cr.key().toString());
return matcher.find();
}
};
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java
Expand Up @@ -58,7 +58,13 @@ public void setValueType(MessageType valueType) {
this.valueType.set(valueType.name());
}

public boolean containsAvro() {
return getValueType() == MessageType.AVRO || getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || getKeyType() == MessageType.AVRO || getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY;
public boolean requiresSchemaRegistry() {
return getValueType() == MessageType.AVRO ||
getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY ||
getValueType() == MessageType.PROTOBUF_SR ||
getKeyType() == MessageType.AVRO ||
getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY ||
getKeyType() == MessageType.PROTOBUF_SR
;
}
}
2 changes: 1 addition & 1 deletion src/main/java/at/esque/kafka/handlers/ConsumerHandler.java
Expand Up @@ -65,7 +65,7 @@ public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicM
consumerProps.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, configHandler.getSettingsProperties().get(Settings.ENABLE_AVRO_LOGICAL_TYPE_CONVERSIONS));
consumerProps.setProperty("kafkaesque.cluster.id", config.getIdentifier());
consumerProps.put("kafkaesque.confighandler", configHandler);
if (topicMessageTypeConfig.containsAvro() && StringUtils.isEmpty(config.getSchemaRegistry())) {
if (topicMessageTypeConfig.requiresSchemaRegistry() && StringUtils.isEmpty(config.getSchemaRegistry())) {
Optional<String> input = SystemUtils.showInputDialog("http://localhost:8081", "Add schema-registry url", "this cluster config is missing a schema registry url please add it now", "schema-registry URL");
if (!input.isPresent()) {
throw new MissingSchemaRegistryException(config.getIdentifier());
Expand Down

0 comments on commit 626f801

Please sign in to comment.