Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support consumer deserializer config in Python kafka connector #364

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import List, Dict, Optional, Any

from confluent_kafka import Consumer, Producer, Message, TopicPartition, KafkaException
from confluent_kafka.serialization import StringDeserializer
from sortedcontainers import SortedSet

from langstream import Record, CommitCallback, SimpleRecord
Expand All @@ -33,11 +32,18 @@
FLOAT_SERIALIZER,
DOUBLE_SERIALIZER,
BYTEARRAY_SERIALIZER,
STRING_DESERIALIZER,
BOOLEAN_DESERIALIZER,
SHORT_DESERIALIZER,
INTEGER_DESERIALIZER,
LONG_DESERIALIZER,
FLOAT_DESERIALIZER,
DOUBLE_DESERIALIZER,
BYTEARRAY_DESERIALIZER,
)
from .topic_connector import TopicConsumer, TopicProducer

LOG = logging.getLogger(__name__)
STRING_DESERIALIZER = StringDeserializer()

SERIALIZERS = {
"org.apache.kafka.common.serialization.StringSerializer": STRING_SERIALIZER,
Expand All @@ -50,6 +56,17 @@
"org.apache.kafka.common.serialization.ByteArraySerializer": BYTEARRAY_SERIALIZER,
}

DESERIALIZERS = {
"org.apache.kafka.common.serialization.StringDeserializer": STRING_DESERIALIZER,
"org.apache.kafka.common.serialization.BooleanDeserializer": BOOLEAN_DESERIALIZER,
"org.apache.kafka.common.serialization.ShortDeserializer": SHORT_DESERIALIZER,
"org.apache.kafka.common.serialization.IntegerDeserializer": INTEGER_DESERIALIZER,
"org.apache.kafka.common.serialization.LongDeserializer": LONG_DESERIALIZER,
"org.apache.kafka.common.serialization.FloatDeserializer": FLOAT_DESERIALIZER,
"org.apache.kafka.common.serialization.DoubleDeserializer": DOUBLE_DESERIALIZER,
"org.apache.kafka.common.serialization.ByteArrayDeserializer": BYTEARRAY_DESERIALIZER, # noqa: E501
}


def apply_default_configuration(streaming_cluster, configs):
if "admin" in streaming_cluster["configuration"]:
Expand Down Expand Up @@ -120,10 +137,10 @@ def extract_jaas_property(prop, jaas_entry):


class KafkaRecord(SimpleRecord):
def __init__(self, message: Message):
def __init__(self, message: Message, value, key):
super().__init__(
STRING_DESERIALIZER(message.value()),
key=STRING_DESERIALIZER(message.key()),
value=value,
key=key,
origin=message.topic(),
timestamp=message.timestamp()[1],
headers=message.headers(),
Expand All @@ -145,8 +162,8 @@ def __init__(self, configs):
self.configs = configs.copy()
self.configs["on_commit"] = self.on_commit
self.topic = self.configs.pop("topic")
self.key_deserializer = self.configs.pop("key.deserializer")
self.value_deserializer = self.configs.pop("value.deserializer")
self.key_deserializer = DESERIALIZERS[self.configs.pop("key.deserializer")]
self.value_deserializer = DESERIALIZERS[self.configs.pop("value.deserializer")]
self.consumer: Optional[Consumer] = None
self.committed: Dict[TopicPartition, int] = {}
self.uncommitted: Dict[TopicPartition, SortedSet[int]] = {}
Expand Down Expand Up @@ -182,10 +199,10 @@ def close(self):
self.consumer.commit(offsets=offsets, asynchronous=False)
self.consumer.close()

def read(self) -> List[KafkaRecord]:
def read(self, timeout=1.0) -> List[KafkaRecord]:
if self.commit_failure:
raise self.commit_failure
message = self.consumer.poll(1.0)
message = self.consumer.poll(timeout)
if message is None:
return []
if message.error():
Expand All @@ -195,7 +212,13 @@ def read(self) -> List[KafkaRecord]:
f"Received message from Kafka topics {self.consumer.assignment()}:"
f" {message}"
)
return [KafkaRecord(message)]
return [
KafkaRecord(
message,
value=self.value_deserializer(message.value()),
key=self.key_deserializer(message.key()),
)
]

def commit(self, records: List[KafkaRecord]):
"""Commit the offsets of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,28 @@

from confluent_kafka.serialization import (
Serializer,
Deserializer,
SerializationError,
StringSerializer,
DoubleSerializer,
IntegerSerializer,
StringDeserializer,
DoubleDeserializer,
IntegerDeserializer,
)


class BooleanSerializer(Serializer):
"""
Serializes bool to boolean bytes.
Serializes bool to bytes.

See Also:
`BooleanSerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java>`_
""" # noqa: E501

def __call__(self, obj, ctx=None):
"""
Serializes bool as boolean bytes.
Serializes bool as bytes.

Args:
obj (object): object to be serialized
Expand All @@ -57,7 +61,44 @@ def __call__(self, obj, ctx=None):
if obj is None:
return None

return b"\x01" if obj else b"\x00"
try:
return _struct.pack(">?", obj)
except _struct.error as e:
raise SerializationError(str(e))


class BooleanDeserializer(Deserializer):
"""
Deserializes bool from bytes.

See Also:
`BooleanDeserializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java>`_
""" # noqa: E501

def __call__(self, value, ctx=None):
"""
Deserializes bool from bytes.

Args:
value (bytes): bytes to be deserialized

ctx (SerializationContext): Metadata pertaining to the serialization
operation

Raises:
SerializerError if an error occurs during deserialization.

Returns:
bool if data is not None, otherwise None
"""

if value is None:
return None

try:
return _struct.unpack(">?", value)[0]
except _struct.error as e:
raise SerializationError(str(e))


class ShortSerializer(Serializer):
Expand Down Expand Up @@ -97,6 +138,40 @@ def __call__(self, obj, ctx=None):
raise SerializationError(str(e))


class ShortDeserializer(Deserializer):
"""
Deserializes int from int16 bytes.

See Also:
`ShortDeserializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java>`_
""" # noqa: E501

def __call__(self, value, ctx=None):
"""
Deserializes int from int16 bytes.

Args:
value (bytes): bytes to be deserialized

ctx (SerializationContext): Metadata pertaining to the serialization
operation

Raises:
SerializerError if an error occurs during deserialization.

Returns:
int if data is not None, otherwise None
"""

if value is None:
return None

try:
return _struct.unpack(">h", value)[0]
except _struct.error as e:
raise SerializationError(str(e))


class LongSerializer(Serializer):
"""
Serializes int to int64 bytes.
Expand Down Expand Up @@ -134,12 +209,46 @@ def __call__(self, obj, ctx=None):
raise SerializationError(str(e))


class LongDeserializer(Deserializer):
"""
Deserializes int from int64 bytes.

See Also:
`LongDeserializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java>`_
""" # noqa: E501

def __call__(self, value, ctx=None):
"""
Deserializes int from int32 bytes.

Args:
value (bytes): bytes to be deserialized

ctx (SerializationContext): Metadata pertaining to the serialization
operation

Raises:
SerializerError if an error occurs during deserialization.

Returns:
int if data is not None, otherwise None
"""

if value is None:
return None

try:
return _struct.unpack(">q", value)[0]
except _struct.error as e:
raise SerializationError(str(e))


class FloatSerializer(Serializer):
"""
Serializes float to IEEE 754 binary32.
Serializes float to IEEE 754 binary32 bytes.

See Also:
`FloatSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/FloatSerializer.html>`_
`FloatSerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java>`_

""" # noqa: E501

Expand Down Expand Up @@ -170,12 +279,46 @@ def __call__(self, obj, ctx=None):
raise SerializationError(str(e))


class FloatDeserializer(Deserializer):
"""
Deserializes float from IEEE 754 binary32 bytes.

See Also:
`FloatDeserializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java>`_
""" # noqa: E501

def __call__(self, value, ctx=None):
"""
Deserializes float from IEEE 754 binary32 bytes.

Args:
value (bytes): bytes to be deserialized

ctx (SerializationContext): Metadata pertaining to the serialization
operation

Raises:
SerializerError if an error occurs during deserialization.

Returns:
float if data is not None, otherwise None
"""

if value is None:
return None

try:
return _struct.unpack(">f", value)[0]
except _struct.error as e:
raise SerializationError(str(e))


class ByteArraySerializer(Serializer):
"""
Serializes bytes.

See Also:
`ByteArraySerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/ByteArraySerializer.html>`_
`ByteArraySerializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java>`_

""" # noqa: E501

Expand All @@ -201,11 +344,50 @@ def __call__(self, obj, ctx=None):
return None

if not isinstance(obj, bytes):
raise SerializationError(f"ByteArraySerializer cannot serialize {obj}")
raise SerializationError(
f"ByteArraySerializer: a bytes-like object is required, not {type(obj)}"
)

return obj


class ByteArrayDeserializer(Deserializer):
"""
Deserializes bytes.

See Also:
`ByteArrayDeserializer Javadoc <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java>`_
""" # noqa: E501

def __call__(self, value, ctx=None):
"""
Deserializes bytes.

Args:
value (bytes): bytes to be deserialized

ctx (SerializationContext): Metadata pertaining to the serialization
operation

Raises:
SerializerError if an error occurs during deserialization.

Returns:
float if data is not None, otherwise None
"""

if value is None:
return None

if not isinstance(value, bytes):
raise SerializationError(
f"ByteArrayDeserializer: a bytes-like object is required, "
f"not {type(value)}"
)

return value


STRING_SERIALIZER = StringSerializer()
BOOLEAN_SERIALIZER = BooleanSerializer()
SHORT_SERIALIZER = ShortSerializer()
Expand All @@ -214,3 +396,12 @@ def __call__(self, obj, ctx=None):
FLOAT_SERIALIZER = FloatSerializer()
DOUBLE_SERIALIZER = DoubleSerializer()
BYTEARRAY_SERIALIZER = ByteArraySerializer()

STRING_DESERIALIZER = StringDeserializer()
BOOLEAN_DESERIALIZER = BooleanDeserializer()
SHORT_DESERIALIZER = ShortDeserializer()
INTEGER_DESERIALIZER = IntegerDeserializer()
LONG_DESERIALIZER = LongDeserializer()
FLOAT_DESERIALIZER = FloatDeserializer()
DOUBLE_DESERIALIZER = DoubleDeserializer()
BYTEARRAY_DESERIALIZER = ByteArrayDeserializer()
Loading
Loading