Skip to content

Commit

Permalink
Avoid re-encoding for message crc check (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp committed Mar 13, 2017
1 parent 92a66e3 commit 47004bb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
18 changes: 12 additions & 6 deletions kafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
timestamp = int(time.time() * 1000)
self.timestamp = timestamp
self.crc = crc
self._validated_crc = None
self.magic = magic
self.attributes = attributes
self.key = key
Expand Down Expand Up @@ -85,7 +86,9 @@ def _encode_self(self, recalc_crc=True):

@classmethod
def decode(cls, data):
_validated_crc = None
if isinstance(data, bytes):
_validated_crc = crc32(data[4:])
data = io.BytesIO(data)
# Partial decode required to determine message version
base_fields = cls.SCHEMAS[0].fields[0:3]
Expand All @@ -96,14 +99,17 @@ def decode(cls, data):
timestamp = fields[0]
else:
timestamp = None
return cls(fields[-1], key=fields[-2],
magic=magic, attributes=attributes, crc=crc,
timestamp=timestamp)
msg = cls(fields[-1], key=fields[-2],
magic=magic, attributes=attributes, crc=crc,
timestamp=timestamp)
msg._validated_crc = _validated_crc
return msg

def validate_crc(self):
raw_msg = self._encode_self(recalc_crc=False)
crc = crc32(raw_msg[4:])
if crc == self.crc:
if self._validated_crc is None:
raw_msg = self._encode_self(recalc_crc=False)
self._validated_crc = crc32(raw_msg[4:])
if self.crc == self._validated_crc:
return True
return False

Expand Down
24 changes: 24 additions & 0 deletions test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ def test_decode_message():
assert decoded_message == msg


def test_decode_message_validate_crc():
encoded = b''.join([
struct.pack('>i', -1427009701), # CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 3), # Length of key
b'key', # key
struct.pack('>i', 4), # Length of value
b'test', # value
])
decoded_message = Message.decode(encoded)
assert decoded_message.validate_crc() is True

encoded = b''.join([
struct.pack('>i', 1234), # Incorrect CRC
struct.pack('>bb', 0, 0), # Magic, flags
struct.pack('>i', 3), # Length of key
b'key', # key
struct.pack('>i', 4), # Length of value
b'test', # value
])
decoded_message = Message.decode(encoded)
assert decoded_message.validate_crc() is False


def test_encode_message_set():
messages = [
Message(b'v1', key=b'k1'),
Expand Down

0 comments on commit 47004bb

Please sign in to comment.