Skip to content

Commit

Permalink
add base64 serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
mpeter-bp committed Oct 6, 2023
1 parent 3980021 commit aeb4417
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/main/java/at/esque/kafka/MessageType.java
Expand Up @@ -4,14 +4,15 @@ public enum MessageType {
STRING,
AVRO,
AVRO_TOPIC_RECORD_NAME_STRATEGY,
PROTOBUF_SR,
BASE64,
UUID,
SHORT,
INTEGER,
LONG,
FLOAT,
DOUBLE,
BYTEARRAY,
BYTEBUFFER,
BYTES,
UUID,
PROTOBUF_SR
BYTES
}
18 changes: 18 additions & 0 deletions src/main/java/at/esque/kafka/serialization/Base64Deserializer.java
@@ -0,0 +1,18 @@
package at.esque.kafka.serialization;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Base64;

public class Base64Deserializer extends StringDeserializer {

@Override
public String deserialize(String topic, byte[] data) {
if (data == null) {
return null;
} else {
return new String(Base64.getEncoder().encode(data));
}
}
}

17 changes: 17 additions & 0 deletions src/main/java/at/esque/kafka/serialization/Base64Serializer.java
@@ -0,0 +1,17 @@
package at.esque.kafka.serialization;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Base64;

public class Base64Serializer extends StringSerializer {

@Override
public byte[] serialize(String topic, String data) {
if (data == null) {
return new byte[0];
} else {
return Base64.getDecoder().decode(data);
}
}
}
Expand Up @@ -91,6 +91,8 @@ private Deserializer deserializerByType(MessageType type) {
return Serdes.ByteBuffer().deserializer();
case BYTES:
return Serdes.Bytes().deserializer();
case BASE64:
return new Base64Deserializer();
case UUID:
return Serdes.UUID().deserializer();
case PROTOBUF_SR:
Expand Down
Expand Up @@ -85,6 +85,8 @@ private SerializerWrapper serializerByType(MessageType type) {
return new SerializerWrapper<ByteBuffer>(s -> ByteBuffer.wrap(s.getBytes()), Serdes.ByteBuffer().serializer());
case BYTES:
return new SerializerWrapper<Bytes>(s -> Bytes.wrap(s.getBytes()), Serdes.Bytes().serializer());
case BASE64:
return new SerializerWrapper<String>(s -> s, new Base64Serializer());
case UUID:
return new SerializerWrapper<UUID>(UUID::fromString, Serdes.UUID().serializer());
case PROTOBUF_SR:
Expand Down

0 comments on commit aeb4417

Please sign in to comment.