Skip to content
29 changes: 17 additions & 12 deletions kafka/protocol/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,26 @@ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)

topics = []
for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
topic_msgs = []
for partition, payload in topic_payloads.items():
partition_msgs = []
for msg in payload.messages:
m = kafka.protocol.message.Message(
msg.value, key=msg.key,
magic=msg.magic, attributes=msg.attributes
)
partition_msgs.append((0, m.encode()))
topic_msgs.append((partition, partition_msgs))
topics.append((topic, topic_msgs))


return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
topics=[(
topic,
[(
partition,
[(0,
kafka.protocol.message.Message(
msg.value, key=msg.key,
magic=msg.magic, attributes=msg.attributes
).encode())
for msg in payload.messages])
for partition, payload in topic_payloads.items()])
for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
topics=topics
)

@classmethod
def decode_produce_response(cls, response):
Expand Down
4 changes: 2 additions & 2 deletions kafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .types import (
Int8, Int32, Int64, Bytes, Schema, AbstractType
)
from ..util import crc32
from ..util import crc32, WeakMethod


class Message(Struct):
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
self.attributes = attributes
self.key = key
self.value = value
self.encode = self._encode_self
self.encode = WeakMethod(self._encode_self)

@property
def timestamp_type(self):
Expand Down
6 changes: 5 additions & 1 deletion kafka/protocol/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from .abstract import AbstractType
from .types import Schema

from ..util import WeakMethod


class Struct(AbstractType):
SCHEMA = Schema()
Expand All @@ -19,7 +21,9 @@ def __init__(self, *args, **kwargs):
self.__dict__.update(kwargs)

# overloading encode() to support both class and instance
self.encode = self._encode_self
# Without WeakMethod() this creates circular ref, which
# causes instances to "leak" to garbage
self.encode = WeakMethod(self._encode_self)

@classmethod
def encode(cls, item): # pylint: disable=E0202
Expand Down
4 changes: 2 additions & 2 deletions kafka/vendor/selectors34.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def __getitem__(self, fileobj):
def __iter__(self):
return iter(self._selector._fd_to_key)

# Using six.add_metaclass() decorator instead of six.with_metaclass() because
# the latter leaks temporary_class to garbage with gc disabled


@six.add_metaclass(ABCMeta)
class BaseSelector(object):
"""Selector abstract base class.
Expand Down
4 changes: 3 additions & 1 deletion kafka/vendor/six.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def __len__(self):
else:
# 64-bit
MAXSIZE = int((1 << 63) - 1)
del X

# Don't del it here, cause with gc disabled this "leaks" to garbage
# del X
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is a vendored library, and I really would prefer fixing upstream. otherwise I'm likely to forget and overwrite when I import a newer version. Have you checked if this is fixed already in six?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upstream doesn't have this fix, but I've filed a pull request: benjaminp/six#176



def _add_doc(func, doc):
Expand Down