Skip to content

Commit

Permalink
Add serialization of dict and list to json bytes in python Kafka (#388)
Browse files Browse the repository at this point in the history
* Add serialization of dict and list to json bytes in python Kafka

* Add test to cover list serialization
  • Loading branch information
cbornet committed Sep 8, 2023
1 parent 28794f1 commit 3eb845a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
FLOAT_SERIALIZER,
DOUBLE_SERIALIZER,
BYTEARRAY_SERIALIZER,
JSON_SERIALIZER,
STRING_DESERIALIZER,
BOOLEAN_DESERIALIZER,
SHORT_DESERIALIZER,
Expand Down Expand Up @@ -142,6 +143,8 @@ def get_serializer(value):
return LONG_SERIALIZER
elif isinstance(value, float):
return DOUBLE_SERIALIZER
elif isinstance(value, dict) or isinstance(value, list):
return JSON_SERIALIZER
else:
raise TypeError(f"No serializer for type {type(value)}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import json
import struct as _struct

from confluent_kafka.serialization import (
Expand Down Expand Up @@ -388,6 +389,38 @@ def __call__(self, value, ctx=None):
return value


class JsonSerializer(Serializer):
"""
Serializes objects using json.dumps
"""

def __call__(self, obj, ctx=None):
"""
Args:
obj (object): object to be serialized
ctx (SerializationContext): Metadata pertaining to the serialization
operation
Note:
None objects are represented as Kafka Null.
Raises:
SerializerError if an error occurs during serialization.
Returns:
the bytes
"""

if obj is None:
return None

try:
return json.dumps(obj).encode("utf-8")
except Exception as e:
raise SerializationError(str(e))


STRING_SERIALIZER = StringSerializer()
BOOLEAN_SERIALIZER = BooleanSerializer()
SHORT_SERIALIZER = ShortSerializer()
Expand All @@ -396,6 +429,7 @@ def __call__(self, value, ctx=None):
FLOAT_SERIALIZER = FloatSerializer()
DOUBLE_SERIALIZER = DoubleSerializer()
BYTEARRAY_SERIALIZER = ByteArraySerializer()
JSON_SERIALIZER = JsonSerializer()

STRING_DESERIALIZER = StringDeserializer()
BOOLEAN_DESERIALIZER = BooleanDeserializer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ def test_serializers():
("ByteArraySerializer", True, b"\x01"),
("ByteArraySerializer", 42, b"\x00\x00\x00\x00\x00\x00\x00\x2A"),
("ByteArraySerializer", 42.0, b"\x40\x45\x00\x00\x00\x00\x00\x00"),
("ByteArraySerializer", {"a": "b", "c": 42.0}, b'{"a": "b", "c": 42.0}'),
(
"ByteArraySerializer",
[{"a": "b"}, {"c": 42.0}],
b'[{"a": "b"}, {"c": 42.0}]',
),
]:
sink = kafka_connection.create_topic_producer(
"id",
Expand Down

0 comments on commit 3eb845a

Please sign in to comment.