Skip to content

Commit

Permalink
Enabling protobuf in KafkaEsque, the schema registry is able to use p…
Browse files Browse the repository at this point in the history
…rotobuf, protobuf messages can be displayed
  • Loading branch information
daengi committed Sep 29, 2023
1 parent 8e45971 commit 565a7c5
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 32 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group = 'at.esque.kafka'
version = '2.6.0'
version = '2.7.0'

repositories {
mavenCentral()
Expand Down Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation 'com.opencsv:opencsv:5.5.2'
implementation 'io.confluent:kafka-schema-registry:7.3.0'
implementation 'io.confluent:kafka-avro-serializer:6.2.7'
implementation 'io.confluent:kafka-protobuf-serializer:6.2.7'
implementation 'org.apache.avro:avro:1.11.1'
implementation 'com.google.inject:guice:5.0.1'
implementation 'org.fxmisc.richtext:richtextfx:0.10.7'
Expand Down
26 changes: 23 additions & 3 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -41,6 +41,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.protobuf.Message;
import com.opencsv.bean.CsvToBeanBuilder;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import javafx.application.HostServices;
Expand Down Expand Up @@ -136,6 +137,7 @@
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


public class Controller {
Expand Down Expand Up @@ -468,7 +470,7 @@ private ListCell<String> topicListCellFactory() {
try {
TopicMessageTypeConfig topicMessageTypeConfig = configHandler.getConfigForTopic(selectedCluster().getIdentifier(), selectedTopic());
Map<String, String> consumerConfig = configHandler.readConsumerConfigs(selectedCluster().getIdentifier());
TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems())
TraceInputDialog.show(topicMessageTypeConfig.getKeyType() == MessageType.AVRO || topicMessageTypeConfig.getKeyType() == MessageType.PROTOBUF_SR, Settings.isTraceQuickSelectEnabled(configHandler.getSettingsProperties()), Settings.readDurationSetting(configHandler.getSettingsProperties()), Integer.parseInt(configHandler.getSettingsProperties().get(Settings.RECENT_TRACE_MAX_ENTRIES)), partitionCombobox.getItems())
.ifPresent(traceInput -> {
backGroundTaskHolder.setBackGroundTaskDescription("tracing message");
Integer partition = null;
Expand Down Expand Up @@ -565,7 +567,8 @@ private void getTopicsForCluster() {
backGroundTaskHolder.setIsInProgress(true);
Set<String> topics = adminClient.getTopics();
Platform.runLater(() -> topicListView.setItems(topics));

} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
} finally {
stopWatch.stop();
LOGGER.info("Finished getting topics for cluster [{}]", stopWatch);
Expand Down Expand Up @@ -908,7 +911,10 @@ private void subscribeOrAssignToSelectedPartition(TopicMessageTypeConfig topic,
if (selectedPartition >= 0) {
consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(Collections.singletonList(new TopicPartition(selectedTopic(), selectedPartition))));
} else {
consumerHandler.subscribe(consumerId, topic.getName());
List<TopicPartition> partitions = adminClient.getPatitions(topic.getName()).stream()
.map(integer -> new TopicPartition(topic.getName(), integer))
.collect(Collectors.toList());
consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions));
}
}

Expand Down Expand Up @@ -1035,9 +1041,23 @@ private <KT, VT> void convertAndAdd(ConsumerRecord<KT, VT> cr, ObservableList<Ka
if (cr.value() instanceof GenericData.Record recordValue) {
kafkaMessage.setValueType(extractTypeFromGenericRecord(recordValue));
kafkaMessage.getMetaData().add(new StringMetadata("Value Schema ID", extractSchemaIdFromGenericRecord(recordValue)));
} else if (cr.value() instanceof Message message) {
kafkaMessage.setValueType(message.getDescriptorForType().getFullName());
try {
kafkaMessage.setValue(JsonUtils.toJson(message));
} catch (IOException e) {
LOGGER.error("Failed printing protobuf message as json", e);
}
}
if (cr.key() instanceof GenericData.Record recordKey) {
kafkaMessage.setKeyType(extractTypeFromGenericRecord(recordKey));
} else if (cr.key() instanceof Message messageKey) {
kafkaMessage.setKeyType(messageKey.getDescriptorForType().getFullName());
try {
kafkaMessage.setKey(JsonUtils.toJson(messageKey));
} catch (IOException e) {
LOGGER.error("Failed printing protobuf message as json", e);
}
}

kafkaMessage.setTimestamp(Instant.ofEpochMilli(cr.timestamp()).toString());
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/at/esque/kafka/CreateSchemaController.java
Expand Up @@ -10,7 +10,6 @@
import javafx.stage.FileChooser;
import javafx.stage.Stage;
import javafx.stage.Window;
import org.apache.avro.Schema;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,9 +30,7 @@ public class CreateSchemaController {

public void addSchema(ActionEvent actionEvent) {

Schema.Parser parser = new Schema.Parser();
try {
parser.parse(schemaTextArea.getText());
restService.registerSchema(schemaTextArea.getText(), subjectTextField.getText());
SuccessAlert.show("Success", null, "Schema added successfully!", getWindow());
} catch (Exception e) {
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/at/esque/kafka/CrossClusterController.java
Expand Up @@ -37,19 +37,22 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.kordamp.ikonli.fontawesome.FontAwesome;
import org.kordamp.ikonli.javafx.FontIcon;

import java.io.IOException;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class CrossClusterController {

Expand Down Expand Up @@ -168,7 +171,10 @@ private void startOperation(UUID operationId) {
try {
producerId = producerHandler.registerProducer(operation.getToCluster(), operation.getToTopic().getName());
consumerId = consumerHandler.registerConsumer(operation.getFromCluster(), operation.getFromTopic(), configHandler.readConsumerConfigs(operation.getToCluster().getIdentifier()));
consumerHandler.subscribe(consumerId, operation.getFromTopic().getName());
List<TopicPartition> partitions = fromAdmin.getPatitions(operation.getFromTopic().getName()).stream()
.map(integer -> new TopicPartition(operation.getFromTopic().getName(), integer))
.collect(Collectors.toList());
consumerHandler.getConsumer(consumerId).ifPresent(topicConsumer -> topicConsumer.assign(partitions));
if (instantPicker.getInstantValue() != null) {
consumerHandler.seekToTime(consumerId, instantPicker.getInstantValue().toEpochMilli());
} else {
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/at/esque/kafka/JsonUtils.java
Expand Up @@ -13,6 +13,11 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.Struct;
import com.google.protobuf.util.JsonFormat;
import javafx.scene.control.TreeItem;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand Down Expand Up @@ -350,4 +355,21 @@ private static void applyCorrectAdder(JsonNode value, JsonTreeItem treeItem) {
recursivelyAddElements((String.valueOf(value)), treeItem);
}
}

public static Message fromJson(String json) {
Struct.Builder structBuilder = Struct.newBuilder();
try {
JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return structBuilder.build();
}

public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException {
if (messageOrBuilder == null) {
return null;
}
return JsonFormat.printer().print(messageOrBuilder);
}
}
3 changes: 2 additions & 1 deletion src/main/java/at/esque/kafka/MessageType.java
Expand Up @@ -12,5 +12,6 @@ public enum MessageType {
BYTEARRAY,
BYTEBUFFER,
BYTES,
UUID
UUID,
PROTOBUF_SR
}
Expand Up @@ -11,7 +11,6 @@
import javafx.stage.FileChooser;
import javafx.stage.Stage;
import javafx.stage.Window;
import org.apache.avro.Schema;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -40,10 +39,7 @@ public class SchemaCompatibilityCheckController {

public void addSchema(ActionEvent actionEvent) {

Schema.Parser parser = new Schema.Parser();
try {
parser.parse(schemaTextArea.getText());

List<String> compatibility = restService.testCompatibility(schemaTextArea.getText(), subjectTextField.getText(), versionTextField.getText());

if (compatibility.isEmpty()) {
Expand Down
Expand Up @@ -55,7 +55,9 @@ public class SchemaRegistryBrowserController {
@FXML
private Label schemaIdLabel;
@FXML
public Label compatibilityLabel;
private Label compatibilityLabel;
@FXML
private Label typeLabel;


public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) {
Expand All @@ -78,7 +80,8 @@ public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) {
try {
Schema schema = schemaRegistryRestService.getVersion(subjectListView.getListView().getSelectionModel().getSelectedItem(), newValue1);
schemaIdLabel.setText(String.valueOf(schema.getId()));
schemaTextArea.setText(JsonUtils.formatJson(schema.getSchema()));
schemaTextArea.setText("AVRO".equals(schema.getSchemaType()) ? JsonUtils.formatJson(schema.getSchema()) : schema.getSchema());
typeLabel.setText(schema.getSchemaType());
} catch (Exception e) {
ErrorAlert.show(e, getWindow());
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/at/esque/kafka/cluster/TopicMessageTypeConfig.java
Expand Up @@ -58,7 +58,13 @@ public void setValueType(MessageType valueType) {
this.valueType.set(valueType.name());
}

public boolean containsAvro() {
return getValueType() == MessageType.AVRO || getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY || getKeyType() == MessageType.AVRO || getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY;
public boolean requiresSchemaRegistry() {
return getValueType() == MessageType.AVRO ||
getValueType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY ||
getValueType() == MessageType.PROTOBUF_SR ||
getKeyType() == MessageType.AVRO ||
getKeyType() == MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY ||
getKeyType() == MessageType.PROTOBUF_SR
;
}
}
2 changes: 1 addition & 1 deletion src/main/java/at/esque/kafka/handlers/ConsumerHandler.java
Expand Up @@ -65,7 +65,7 @@ public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicM
consumerProps.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, configHandler.getSettingsProperties().get(Settings.ENABLE_AVRO_LOGICAL_TYPE_CONVERSIONS));
consumerProps.setProperty("kafkaesque.cluster.id", config.getIdentifier());
consumerProps.put("kafkaesque.confighandler", configHandler);
if (topicMessageTypeConfig.containsAvro() && StringUtils.isEmpty(config.getSchemaRegistry())) {
if (topicMessageTypeConfig.requiresSchemaRegistry() && StringUtils.isEmpty(config.getSchemaRegistry())) {
Optional<String> input = SystemUtils.showInputDialog("http://localhost:8081", "Add schema-registry url", "this cluster config is missing a schema registry url please add it now", "schema-registry URL");
if (!input.isPresent()) {
throw new MissingSchemaRegistryException(config.getIdentifier());
Expand Down
@@ -0,0 +1,33 @@
package at.esque.kafka.serialization;

import com.google.protobuf.Message;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class ForgivingProtobufAvroDeserializer implements Deserializer<Object> {

private KafkaProtobufDeserializer<Message> baseDeserializer = new KafkaProtobufDeserializer<>();
private static final Logger logger = LoggerFactory.getLogger(ForgivingProtobufAvroDeserializer.class);

public Object deserialize(byte[] bytes) {
throw new UnsupportedOperationException();
}

public Object deserialize(String s, byte[] bytes) {
try {
return baseDeserializer.deserialize(s, bytes);
} catch (Exception e) {
logger.warn("Failed Protobuf deserialization in topic {}", s);
}
return "Failed Protobuf deserialization! Content as String: " + new String(bytes);
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
baseDeserializer.configure(configs, isKey);
}
}
Expand Up @@ -3,7 +3,6 @@
import at.esque.kafka.MessageType;
import at.esque.kafka.cluster.TopicMessageTypeConfig;
import at.esque.kafka.handlers.ConfigHandler;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -37,7 +36,7 @@ public Object deserialize(String s, byte[] bytes) {

Integer schemaId = null;

if (MessageType.AVRO.equals(messageType) || MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(messageType)) {
if (isSchemaRegistryMessageType(messageType)) {
schemaId = getSchemaId(bytes);
}
if ((deserializedObj instanceof GenericData.Record)) {
Expand All @@ -49,24 +48,26 @@ public Object deserialize(String s, byte[] bytes) {
return deserializedObj;
}

private static boolean isSchemaRegistryMessageType(MessageType messageType) {
return MessageType.AVRO.equals(messageType)
|| MessageType.AVRO_TOPIC_RECORD_NAME_STRATEGY.equals(messageType)
|| MessageType.PROTOBUF_SR.equals(messageType);
}


public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
deserializerMap.values().forEach(deserializer -> {
if (deserializer instanceof ForgivingKafkaAvroDeserializer && configs.get("schema.registry.url") == null) {
if ((deserializer instanceof ForgivingKafkaAvroDeserializer || deserializer instanceof ForgivingProtobufAvroDeserializer) && configs.get("schema.registry.url") == null) {
//Don't call configure for the AvroDeserializer if there is no schema registry url to prevent exception, in cases where avro is not even used
}else {
} else {
deserializer.configure(configs, isKey);
}
});
this.clusterId = (String) configs.get("kafkaesque.cluster.id");
this.configHandler = (ConfigHandler) configs.get("kafkaesque.confighandler");
}

public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
return this.deserialize(s, bytes);
}

public void close() {
}

Expand All @@ -92,6 +93,8 @@ private Deserializer deserializerByType(MessageType type) {
return Serdes.Bytes().deserializer();
case UUID:
return Serdes.UUID().deserializer();
case PROTOBUF_SR:
return new ForgivingProtobufAvroDeserializer();
case AVRO:
case AVRO_TOPIC_RECORD_NAME_STRATEGY:
return new ForgivingKafkaAvroDeserializer();
Expand Down

0 comments on commit 565a7c5

Please sign in to comment.