From 565a7c589569ed343013d85c53d9d142c3a2135d Mon Sep 17 00:00:00 2001 From: daengi <34056087+daengi@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:48:37 +0200 Subject: [PATCH] Enabling protobuf in KafkaEsque, the schema registry is able to use protobuf, protobuf messages can be displayed --- build.gradle | 3 +- src/main/java/at/esque/kafka/Controller.java | 26 +++++++++++++-- .../esque/kafka/CreateSchemaController.java | 3 -- .../esque/kafka/CrossClusterController.java | 8 ++++- src/main/java/at/esque/kafka/JsonUtils.java | 22 +++++++++++++ src/main/java/at/esque/kafka/MessageType.java | 3 +- .../SchemaCompatibilityCheckController.java | 4 --- .../SchemaRegistryBrowserController.java | 7 ++-- .../kafka/cluster/TopicMessageTypeConfig.java | 10 ++++-- .../esque/kafka/handlers/ConsumerHandler.java | 2 +- .../ForgivingProtobufAvroDeserializer.java | 33 +++++++++++++++++++ .../serialization/KafkaEsqueDeserializer.java | 19 ++++++----- .../serialization/KafkaEsqueSerializer.java | 22 +++++++++---- .../resources/fxml/schemaRegistryBrowser.fxml | 5 +++ 14 files changed, 135 insertions(+), 32 deletions(-) create mode 100644 src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java diff --git a/build.gradle b/build.gradle index 66e494f..b5e098a 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ plugins { } group = 'at.esque.kafka' -version = '2.6.0' +version = '2.7.0' repositories { mavenCentral() @@ -42,6 +42,7 @@ dependencies { implementation 'com.opencsv:opencsv:5.5.2' implementation 'io.confluent:kafka-schema-registry:7.3.0' implementation 'io.confluent:kafka-avro-serializer:6.2.7' + implementation 'io.confluent:kafka-protobuf-serializer:6.2.7' implementation 'org.apache.avro:avro:1.11.1' implementation 'com.google.inject:guice:5.0.1' implementation 'org.fxmisc.richtext:richtextfx:0.10.7' diff --git a/src/main/java/at/esque/kafka/Controller.java b/src/main/java/at/esque/kafka/Controller.java index 83fb2cc..51b5b87 100644 --- a/src/main/java/at/esque/kafka/Controller.java +++ b/src/main/java/at/esque/kafka/Controller.java @@ -41,6 +41,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.protobuf.Message; import com.opencsv.bean.CsvToBeanBuilder; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import javafx.application.HostServices; @@ -136,6 +137,7 @@ import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class Controller { @@ -468,7 +470,7 @@ private ListCell topicListCellFactory() { try { TopicMessageTypeConfig topicMessageTypeConfig = configHandler.getConfigForTopic(selectedCluster().getIdentifier(), selectedTopic()); Map consumerConfig = configHandler.readConsumerConfigs(selectedCluster().getIdentifier()); - TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems()) + TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO || topicMessageTypeConfig.getKeyType() == MessageType.PROTOBUF_SR, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems()) .ifPresent(traceInput -> { backGroundTaskHolder.setBackGroundTaskDescription("tracing message"); Integer partition = null; @@ -565,7 +567,8 @@ private void getTopicsForCluster() { backGroundTaskHolder.setIsInProgress(true); Set topics = adminClient.getTopics(); Platform.runLater(() -> topicListView.setItems(topics)); - + } catch (Exception e) { + Platform.runLater(() -> ErrorAlert.show(e)); } finally { stopWatch.stop(); LOGGER.info("Finished getting topics for cluster [{}]", stopWatch); @@ -908,7 +911,10 @@ private void subscribeOrAssignToSelectedPartition(TopicMessageTypeConfig topic, if (selectedPartition >= 0) { consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(Collections.singletonList(new TopicPartition(selectedTopic(), selectedPartition)))); } else { - consumerHandler.subscribe(consumerId, topic.getName()); + List partitions = adminClient.getPatitions(topic.getName()).stream() + .map(integer -> new TopicPartition(topic.getName(), integer)) + .collect(Collectors.toList()); + consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions)); } } @@ -1035,9 +1041,23 @@ private void convertAndAdd(ConsumerRecord cr, ObservableList partitions = fromAdmin.getPatitions(operation.getFromTopic().getName()).stream() + .map(integer -> new TopicPartition(operation.getFromTopic().getName(), integer)) + .collect(Collectors.toList()); + consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions)); if (instantPicker.getInstantValue() != null) { consumerHandler.seekToTime(consumerId, instantPicker.getInstantValue().toEpochMilli()); } else { diff --git a/src/main/java/at/esque/kafka/JsonUtils.java b/src/main/java/at/esque/kafka/JsonUtils.java index f8be536..12b804a 100644 --- a/src/main/java/at/esque/kafka/JsonUtils.java +++ b/src/main/java/at/esque/kafka/JsonUtils.java @@ -13,6 +13,11 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.Struct; +import com.google.protobuf.util.JsonFormat; import javafx.scene.control.TreeItem; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; @@ -350,4 +355,21 @@ private static void applyCorrectAdder(JsonNode value, JsonTreeItem treeItem) { recursivelyAddElements((String.valueOf(value)), treeItem); } } + + public static Message fromJson(String json) { + Struct.Builder structBuilder = Struct.newBuilder(); + try { + JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return structBuilder.build(); + } + + public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException { + if (messageOrBuilder == null) { + return null; + } + return JsonFormat.printer().print(messageOrBuilder); + } } diff --git a/src/main/java/at/esque/kafka/MessageType.java b/src/main/java/at/esque/kafka/MessageType.java index 4437adb..1171dd5 100644 --- a/src/main/java/at/esque/kafka/MessageType.java +++ b/src/main/java/at/esque/kafka/MessageType.java @@ -12,5 +12,6 @@ public enum MessageType { BYTEARRAY, BYTEBUFFER, BYTES, - UUID + UUID, + PROTOBUF_SR } diff --git a/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java b/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java index cc09f1a..a3f611f 100644 --- a/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java +++ b/src/main/java/at/esque/kafka/SchemaCompatibilityCheckController.java @@ -11,7 +11,6 @@ import javafx.stage.FileChooser; import javafx.stage.Stage; import javafx.stage.Window; -import org.apache.avro.Schema; import java.io.File; import java.io.IOException; @@ -40,10 +39,7 @@ public class SchemaCompatibilityCheckController { public void addSchema(ActionEvent actionEvent) { - Schema.Parser parser = new Schema.Parser(); try { - parser.parse(schemaTextArea.getText()); - List compatibility = restService.testCompatibility(schemaTextArea.getText(), subjectTextField.getText(), versionTextField.getText()); if (compatibility.isEmpty()) { diff --git a/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java b/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java index 6e43560..bcd1ab9 100644 --- a/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java +++ b/src/main/java/at/esque/kafka/SchemaRegistryBrowserController.java @@ -55,7 +55,9 @@ public class SchemaRegistryBrowserController { @FXML private Label schemaIdLabel; @FXML - public Label compatibilityLabel; + private Label compatibilityLabel; + @FXML + private Label typeLabel; public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) { @@ -78,7 +80,8 @@ public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) { try { Schema schema = schemaRegistryRestService.getVersion(subjectListView.getListView().getSelectionModel().getSelectedItem(), newValue1); schemaIdLabel.setText(String.valueOf(schema.getId())); - schemaTextArea.setText(JsonUtils.formatJson(schema.getSchema())); + schemaTextArea.setText("AVRO".equals(schema.getSchemaType()) ? JsonUtils.formatJson(schema.getSchema()) : schema.getSchema()); + typeLabel.setText(schema.getSchemaType()); } catch (Exception e) { ErrorAlert.show(e, getWindow()); } diff --git a/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java b/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java index d6919ff..4d6db8d 100644 --- a/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java +++ b/src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java @@ -58,7 +58,13 @@ public void setValueType(MessageType valueType) { this.valueType.set(valueType.name()); } - public boolean containsAvro() { - return getValueType() == MessageType.AVRO || getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || getKeyType() == MessageType.AVRO || getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY; + public boolean requiresSchemaRegistry() { + return getValueType() == MessageType.AVRO || + getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || + getValueType() == MessageType.PROTOBUF_SR || + getKeyType() == MessageType.AVRO || + getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || + getKeyType() == MessageType.PROTOBUF_SR + ; } } diff --git a/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java b/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java index 8dfb7f9..4f29c52 100644 --- a/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java +++ b/src/main/java/at/esque/kafka/handlers/ConsumerHandler.java @@ -65,7 +65,7 @@ public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicM consumerProps.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, configHandler.getSettingsProperties().get(Settings.ENABLE_AVRO_LOGICAL_TYPE_CONVERSIONS)); consumerProps.setProperty("kafkaesque.cluster.id", config.getIdentifier()); consumerProps.put("kafkaesque.confighandler", configHandler); - if (topicMessageTypeConfig.containsAvro() && StringUtils.isEmpty(config.getSchemaRegistry())) { + if (topicMessageTypeConfig.requiresSchemaRegistry() && StringUtils.isEmpty(config.getSchemaRegistry())) { Optional input = SystemUtils.showInputDialog("http://localhost:8081", "Add schema-registry url", "this cluster config is missing a schema registry url please add it now", "schema-registry URL"); if (!input.isPresent()) { throw new MissingSchemaRegistryException(config.getIdentifier()); diff --git a/src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java b/src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java new file mode 100644 index 0000000..9015b4d --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/ForgivingProtobufAvroDeserializer.java @@ -0,0 +1,33 @@ +package at.esque.kafka.serialization; + +import com.google.protobuf.Message; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class ForgivingProtobufAvroDeserializer implements Deserializer { + + private KafkaProtobufDeserializer baseDeserializer = new KafkaProtobufDeserializer<>(); + private static final Logger logger = LoggerFactory.getLogger(ForgivingProtobufAvroDeserializer.class); + + public Object deserialize(byte[] bytes) { + throw new UnsupportedOperationException(); + } + + public Object deserialize(String s, byte[] bytes) { + try { + return baseDeserializer.deserialize(s, bytes); + } catch (Exception e) { + logger.warn("Failed Protobuf deserialization in topic {}", s); + } + return "Failed Protobuf deserialization! Content as String: " + new String(bytes); + } + + @Override + public void configure(Map configs, boolean isKey) { + baseDeserializer.configure(configs, isKey); + } +} diff --git a/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java b/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java index db3446d..47ea7ba 100644 --- a/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java +++ b/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java @@ -3,7 +3,6 @@ import at.esque.kafka.MessageType; import at.esque.kafka.cluster.TopicMessageTypeConfig; import at.esque.kafka.handlers.ConfigHandler; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; @@ -37,7 +36,7 @@ public Object deserialize(String s, byte[] bytes) { Integer schemaId = null; - if (MessageType.AVRO.equals(messageType) || MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(messageType)) { + if (isSchemaRegistryMessageType(messageType)) { schemaId = getSchemaId(bytes); } if ((deserializedObj instanceof GenericData.Record)) { @@ -49,13 +48,19 @@ public Object deserialize(String s, byte[] bytes) { return deserializedObj; } + private static boolean isSchemaRegistryMessageType(MessageType messageType) { + return MessageType.AVRO.equals(messageType) + || MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(messageType) + || MessageType.PROTOBUF_SR.equals(messageType); + } + public void configure(Map configs, boolean isKey) { this.isKey = isKey; deserializerMap.values().forEach(deserializer -> { - if (deserializer instanceof ForgivingKafkaAvroDeserializer && configs.get("schema.registry.url") == null) { + if ((deserializer instanceof ForgivingKafkaAvroDeserializer || deserializer instanceof ForgivingProtobufAvroDeserializer) && configs.get("schema.registry.url") == null) { //Don't call configure for the AvroDeserializer if there is no schema registry url to prevent exception, in cases where avro is not even used - }else { + } else { deserializer.configure(configs, isKey); } }); @@ -63,10 +68,6 @@ public void configure(Map configs, boolean isKey) { this.configHandler = (ConfigHandler) configs.get("kafkaesque.confighandler"); } - public Object deserialize(String s, byte[] bytes, Schema readerSchema) { - return this.deserialize(s, bytes); - } - public void close() { } @@ -92,6 +93,8 @@ private Deserializer deserializerByType(MessageType type) { return Serdes.Bytes().deserializer(); case UUID: return Serdes.UUID().deserializer(); + case PROTOBUF_SR: + return new ForgivingProtobufAvroDeserializer(); case AVRO: case AVRO_TOPIC_RECORD_NAME_STRATEGY: return new ForgivingKafkaAvroDeserializer(); diff --git a/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java b/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java index de926d2..82f92c9 100644 --- a/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java +++ b/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java @@ -1,9 +1,12 @@ package at.esque.kafka.serialization; +import at.esque.kafka.JsonUtils; import at.esque.kafka.MessageType; import at.esque.kafka.cluster.TopicMessageTypeConfig; import at.esque.kafka.handlers.ConfigHandler; +import com.google.protobuf.Message; import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; @@ -18,7 +21,8 @@ public class KafkaEsqueSerializer implements Serializer { - private KafkaAvroSerializer avroSerialier = new KafkaAvroSerializer(); + private KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(); + private KafkaProtobufSerializer protobufSerializer = new KafkaProtobufSerializer(); private static final List AVRO_TYPES = Arrays.asList(MessageType.AVRO, MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY); @@ -37,7 +41,9 @@ public Object serialize(byte[] bytes) { public byte[] serialize(String s, Object object) { TopicMessageTypeConfig topicConfig = configHandler.getConfigForTopic(clusterId, s); if (isKey ? AVRO_TYPES.contains(topicConfig.getKeyType()) : AVRO_TYPES.contains(topicConfig.getValueType())) { - return avroSerialier.serialize(s, object); + return avroSerializer.serialize(s, object); + } else if ((isKey ? MessageType.PROTOBUF_SR.equals(topicConfig.getKeyType()) : MessageType.PROTOBUF_SR.equals(topicConfig.getValueType())) && object instanceof Message message) { + return protobufSerializer.serialize(s, message); } else { SerializerWrapper serializerWrapper = serializerMap.get(isKey ? topicConfig.getKeyType() : topicConfig.getValueType()); return serializerWrapper.serializer.serialize(s, serializerWrapper.function.apply(object)); @@ -49,7 +55,8 @@ public void configure(Map configs, boolean isKey) { this.isKey = isKey; serializerMap.values().forEach(serializer -> serializer.serializer.configure(configs, isKey)); if (configs.get("schema.registry.url") != null) { - avroSerialier.configure(configs, isKey); + avroSerializer.configure(configs, isKey); + protobufSerializer.configure(configs, isKey); } this.clusterId = (String) configs.get("kafkaesque.cluster.id"); this.configHandler = (ConfigHandler) configs.get("kafkaesque.confighandler"); @@ -61,7 +68,7 @@ public void close() { private SerializerWrapper serializerByType(MessageType type) { switch (type) { case STRING: - return new SerializerWrapper(s->s, Serdes.String().serializer()); + return new SerializerWrapper(s -> s, Serdes.String().serializer()); case SHORT: return new SerializerWrapper(Short::parseShort, Serdes.Short().serializer()); case INTEGER: @@ -75,16 +82,19 @@ private SerializerWrapper serializerByType(MessageType type) { case BYTEARRAY: return new SerializerWrapper(String::getBytes, Serdes.ByteArray().serializer()); case BYTEBUFFER: - return new SerializerWrapper( s -> ByteBuffer.wrap(s.getBytes()), Serdes.ByteBuffer().serializer()); + return new SerializerWrapper(s -> ByteBuffer.wrap(s.getBytes()), Serdes.ByteBuffer().serializer()); case BYTES: return new SerializerWrapper(s -> Bytes.wrap(s.getBytes()), Serdes.Bytes().serializer()); case UUID: return new SerializerWrapper(UUID::fromString, Serdes.UUID().serializer()); + case PROTOBUF_SR: + return new SerializerWrapper(JsonUtils::fromJson, protobufSerializer); default: throw new UnsupportedOperationException("no serializer for Message type: " + type); } } - public class SerializerWrapper{ + + public class SerializerWrapper { private Function function; private Serializer serializer; diff --git a/src/main/resources/fxml/schemaRegistryBrowser.fxml b/src/main/resources/fxml/schemaRegistryBrowser.fxml index 10bb9d9..a36f0c5 100644 --- a/src/main/resources/fxml/schemaRegistryBrowser.fxml +++ b/src/main/resources/fxml/schemaRegistryBrowser.fxml @@ -86,6 +86,11 @@ fx:id="compatibilityLabel" minWidth="5" /> + +