Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

[WIP] Support RecordBatch #844

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 235 additions & 2 deletions pykafka/protocol/message.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# - coding: utf-8 -
from datetime import datetime
import struct
from datetime import datetime
from pkg_resources import parse_version
from six import integer_types
from zlib import crc32

from ..common import CompressionType, Message
from ..exceptions import MessageSetDecodeFailure
from ..utils import Serializable, struct_helpers, compression
from ..utils import Serializable, struct_helpers, compression, msg_protocol_version
from ..utils.compat import buffer


Expand Down Expand Up @@ -171,6 +171,92 @@ def set_timestamp(self, ts):
raise RuntimeError()


class Record(Message):
"""
Specification::

Record =>
Length => varint
Attributes => int8
TimestampDelta => varint
OffsetDelta => varint
KeyLen => varint
Key => data
ValueLen => varint
Value => data
Headers => [Header]
"""
__slots__ = Message.__slots__ + ["headers"]

def __init__(self,
value,
partition_key=None,
compression_type=CompressionType.NONE,
offset=-1,
partition_id=-1,
produce_attempt=0,
protocol_version=2,
timestamp=None,
delivery_report_q=None,
headers=None):
super(Record, self).__init__(value, partition_key=partition_key,
compression_type=compression_type,
offset=offset, partition_id=partition_id,
produce_attempt=produce_attempt,
protocol_version=protocol_version,
timestamp=timestamp,
delivery_report_q=delivery_report_q)
self.headers = headers

def timestamp_delta(self, base_timestamp=0):
return self.timestamp - base_timestamp

def offset_delta(self, base_offset=0):
return self.offset - base_offset

def __len__(self):
size = 1
size += struct_helpers.get_varint_size(self.timestamp_delta)
size += struct_helpers.get_varint_size(self.offset_delta)
if self.partition_key is not None:
size += struct_helpers.get_varint_size(len(self.partition_key))
size += len(self.partition_key)
if self.value is not None:
size += struct_helpers.get_varint_size(len(self.value)) + len(self.value)
size += 4
for hkey, hval in self.headers:
size += struct_helpers.get_varint_size(len(hkey)) + len(hkey)
size += struct_helpers.get_varint_size(len(hval)) + len(hval)
return size

@classmethod
def decode(self, buff, base_timestamp=0, base_offset=0, partition_id=-1,
compression_type=CompressionType.NONE):
(length, attr, timestamp_delta,
offset_delta, partition_key, value,
headers) = struct_helpers.unpack_from('VBVVGG [GG]', buff, 0)
return Record(value,
partition_key=partition_key,
compression_type=compression_type,
offset=base_offset + offset_delta,
protocol_version=1, # XXX
timestamp=base_timestamp + timestamp_delta,
partition_id=partition_id)

def pack_into(self, buff, offset, base_timestamp=0, base_offset=0):
fmt = '!VBVVV%dsV%dsi' % (len(self.partition_key), len(self.value))
args = (len(self), 0, self.timestamp_delta(base_timestamp),
self.offset_delta(base_offset), len(self.partition_key),
self.partition_key, len(self.value), self.value, len(self.headers))
struct_helpers.pack_into(fmt, buff, offset, *args)
offset += struct_helpers.calcsize(fmt, *args)
for hkey, hval in self.headers:
fmt = '!V%dsV%ds' % (len(hkey), len(hval))
args = (len(hkey), hkey, len(hval), hval)
struct_helpers.pack_into(fmt, buff, offset, *args)
offset += struct_helpers.calcsize(fmt, *args)


class MessageSet(Serializable):
"""Representation of a set of messages in Kafka

Expand Down Expand Up @@ -225,6 +311,9 @@ def messages(self):
self._compressed = None
return self._messages

def add_message(self, message):
self.messages.append(message)

def _get_compressed(self):
"""Get a compressed representation of all current messages.

Expand Down Expand Up @@ -297,3 +386,147 @@ def pack_into(self, buff, offset):
offset += 12
message.pack_into(buff, offset)
offset += mlen


class RecordBatch(MessageSet):
"""Representation of a Kafka RecordBatch

Specification::

RecordBatch =>
FirstOffset => int64
Length => int32
PartitionLeaderEpoch => int32
Magic => int8
CRC => int32
Attributes => int16
LastOffsetDelta => int32
FirstTimestamp => int64
MaxTimestamp => int64
ProducerId => int64
ProducerEpoch => int16
FirstSequence => int32
Records => [Record]
"""
def __init__(self, messages=None, compression_type=CompressionType.NONE,
broker_version='0.9.0', first_offset=-1, last_offset_delta=-1,
first_timestamp=-1, max_timestamp=-1, protocol_version=None):
super(RecordBatch, self).__init__(messages=messages,
broker_version=broker_version,
compression_type=compression_type)
if protocol_version is not None:
self.protocol_version = protocol_version
else:
self.protocol_version = msg_protocol_version(broker_version)
self.first_offset = first_offset
self.last_offset_delta = last_offset_delta
self.first_timestamp = first_timestamp
self.max_timestamp = max_timestamp

def __len__(self):
if self.compression_type == CompressionType.NONE:
messages = self._messages
else:
if self._compressed is None:
self._compressed = self._get_compressed()
messages = [self._compressed]
length = 8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4
# XXX does this logic still apply to RecordBatch?
length += sum(len(m) for m in messages)
return length

@property
def records(self):
self._compressed = None
return self._messages

def add_message(self, message):
self._messages.append(message)
if message.offset < self.first_offset:
self.first_offset = message.offset
msg_offset_delta = message.offset - self.first_offset
if msg_offset_delta > self.last_offset_delta:
self.last_offset_delta = msg_offset_delta
if message.timestamp < self.first_timestamp:
self.first_timestamp = message.timestamp
if message.timestamp > self.max_timestamp:
self.max_timestamp = message.timestamp

def _get_compressed(self):
assert self.compression_type != CompressionType.NONE
tmp_mset = RecordBatch(messages=self._messages)
uncompressed = bytearray(len(tmp_mset))
tmp_mset.pack_record_array_into(uncompressed, 0)
if self.compression_type == CompressionType.GZIP:
compressed = compression.encode_gzip(buffer(uncompressed))
elif self.compression_type == CompressionType.SNAPPY:
compressed = compression.encode_snappy(buffer(uncompressed))
elif self.compression_type == CompressionType.LZ4:
if parse_version(self._broker_version) >= parse_version('0.10.0'):
compressed = compression.encode_lz4(buffer(uncompressed))
else:
compressed = compression.encode_lz4_old_kafka(buffer(uncompressed))
else:
raise TypeError("Unknown compression: %s" % self.compression_type)
protocol_version = max((m.protocol_version for m in self._messages))
return Record(compressed, compression_type=self.compression_type,
protocol_version=protocol_version)

@classmethod
def decode(cls, buff, partition_id=-1):
offset = 0
fmt = '!qiiBihiqqqhii'
(first_offset, _, _, protocol_version, _, attr, last_offset_delta,
first_timestamp, max_timestamp, _, _, _,
records_count) = struct_helpers.unpack_from(fmt, buff, offset)
offset += struct.calcsize(fmt)

messages = []
while offset < len(buff):
size = struct.unpack_from('V', buff, offset)
message = Record.decode(buff[offset:offset + size],
partition_id=partition_id)
messages.append(message)
offset += size

return RecordBatch(messages=messages, first_offset=first_offset,
protocol_version=protocol_version, compression_type=attr,
last_offset_delta=last_offset_delta,
first_timestamp=first_timestamp, max_timestamp=max_timestamp)

def pack_into(self, buff, offset):
if self.compression_type == CompressionType.NONE:
records = self._records
else:
if self._compressed is None:
self._compressed = self._get_compressed()
records = [self._compressed]
attr = self.compression_type
offset = 0
fmt = '!qiiB'
args = (self.first_offset, len(self), -1, self.protocol_version)
struct.pack_into(fmt, buff, offset, *args)
offset += struct.calcsize(fmt)

crc_offset = offset
fmt = '!hiqqqhii'
args = (attr, self.last_offset_delta, self.first_timestamp, self.max_timestamp,
# NB these -1s are for currently unsupported fields introduced in 0.11
-1, -1, -1, len(records))
struct.pack_into(fmt, buff, offset + 4, *args)
offset += struct.calcsize(fmt) + 4

# TODO replace this with call to pack_record_array_into
for record in records:
record.pack_into(buff, offset, base_timestamp=self.first_timestamp,
base_offset=self.first_offset)
offset += len(record)
end_offset = offset

data = buffer(buff[(crc_offset + 4):end_offset])
crc = crc32(data) & 0xffffffff
struct.pack_into('!I', buff, crc_offset, crc)

def pack_record_array_into(self, buff, offset):
"""Pack only the array of Records, ignoring headers"""
pass
2 changes: 1 addition & 1 deletion pykafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def add_message(self, message, topic_name, partition_id):
:param topic_name: the name of the topic to publish to
:param partition_id: the partition to publish to
"""
self.msets[topic_name][partition_id].messages.append(message)
self.msets[topic_name][partition_id].add_message(message)
self._message_count += 1

def get_bytes(self):
Expand Down
Loading