From 3980021f6d444f48d3e2b9aa96077a7d1931e792 Mon Sep 17 00:00:00 2001 From: mpeter Date: Fri, 6 Oct 2023 10:28:11 +0200 Subject: [PATCH 1/2] fix project name intellij has a bug where the name has to be case-sensitive --- settings.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/settings.gradle b/settings.gradle index e16af1f..f60c013 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'kafkaesque' +rootProject.name = 'KafkaEsque' From aeb44171657a1efaca4255dfba9226542d12f2f9 Mon Sep 17 00:00:00 2001 From: mpeter Date: Fri, 6 Oct 2023 10:43:35 +0200 Subject: [PATCH 2/2] add base64 serdes --- src/main/java/at/esque/kafka/MessageType.java | 7 ++++--- .../serialization/Base64Deserializer.java | 18 ++++++++++++++++++ .../kafka/serialization/Base64Serializer.java | 17 +++++++++++++++++ .../serialization/KafkaEsqueDeserializer.java | 2 ++ .../serialization/KafkaEsqueSerializer.java | 2 ++ 5 files changed, 43 insertions(+), 3 deletions(-) create mode 100644 src/main/java/at/esque/kafka/serialization/Base64Deserializer.java create mode 100644 src/main/java/at/esque/kafka/serialization/Base64Serializer.java diff --git a/src/main/java/at/esque/kafka/MessageType.java b/src/main/java/at/esque/kafka/MessageType.java index 1171dd5..97075dc 100644 --- a/src/main/java/at/esque/kafka/MessageType.java +++ b/src/main/java/at/esque/kafka/MessageType.java @@ -4,6 +4,9 @@ public enum MessageType { STRING, AVRO, AVRO_TOPIC_RECORD_NAME_STRATEGY, + PROTOBUF_SR, + BASE64, + UUID, SHORT, INTEGER, LONG, @@ -11,7 +14,5 @@ public enum MessageType { DOUBLE, BYTEARRAY, BYTEBUFFER, - BYTES, - UUID, - PROTOBUF_SR + BYTES } diff --git a/src/main/java/at/esque/kafka/serialization/Base64Deserializer.java b/src/main/java/at/esque/kafka/serialization/Base64Deserializer.java new file mode 100644 index 0000000..40fab82 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/Base64Deserializer.java @@ -0,0 +1,18 @@ +package at.esque.kafka.serialization; + +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Base64; + +public class Base64Deserializer extends StringDeserializer { + + @Override + public String deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } else { + return new String(Base64.getEncoder().encode(data)); + } + } +} + diff --git a/src/main/java/at/esque/kafka/serialization/Base64Serializer.java b/src/main/java/at/esque/kafka/serialization/Base64Serializer.java new file mode 100644 index 0000000..3e2def2 --- /dev/null +++ b/src/main/java/at/esque/kafka/serialization/Base64Serializer.java @@ -0,0 +1,17 @@ +package at.esque.kafka.serialization; + +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Base64; + +public class Base64Serializer extends StringSerializer { + + @Override + public byte[] serialize(String topic, String data) { + if (data == null) { + return new byte[0]; + } else { + return Base64.getDecoder().decode(data); + } + } +} diff --git a/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java b/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java index 47ea7ba..4e63e79 100644 --- a/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java +++ b/src/main/java/at/esque/kafka/serialization/KafkaEsqueDeserializer.java @@ -91,6 +91,8 @@ private Deserializer deserializerByType(MessageType type) { return Serdes.ByteBuffer().deserializer(); case BYTES: return Serdes.Bytes().deserializer(); + case BASE64: + return new Base64Deserializer(); case UUID: return Serdes.UUID().deserializer(); case PROTOBUF_SR: diff --git a/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java b/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java index 82f92c9..ae0a80e 100644 --- a/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java +++ b/src/main/java/at/esque/kafka/serialization/KafkaEsqueSerializer.java @@ -85,6 +85,8 @@ private SerializerWrapper serializerByType(MessageType type) { return new SerializerWrapper(s -> ByteBuffer.wrap(s.getBytes()), Serdes.ByteBuffer().serializer()); case BYTES: return new SerializerWrapper(s -> Bytes.wrap(s.getBytes()), Serdes.Bytes().serializer()); + case BASE64: + return new SerializerWrapper(s -> s, new Base64Serializer()); case UUID: return new SerializerWrapper(UUID::fromString, Serdes.UUID().serializer()); case PROTOBUF_SR: