Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions kafka/partitioner/hashed.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import six

from .base import Partitioner


Expand Down Expand Up @@ -43,14 +45,16 @@ def murmur2(key):
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2

Args:
key: if not a bytearray, converted via bytearray(str(key))
key: if not a bytes type, encoded using default encoding

Returns: MurmurHash2 of key bytearray
"""

# Convert key to a bytearray
if not isinstance(key, bytearray):
data = bytearray(str(key))
# Convert key to bytes or bytearray
if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
data = key
else:
data = bytearray(str(key).encode())

length = len(data)
seed = 0x9747b28c
Expand All @@ -61,7 +65,7 @@ def murmur2(key):

# Initialize the hash to a random value
h = seed ^ length
length4 = length / 4
length4 = length // 4

for i in range(length4):
i4 = i * 4
Expand All @@ -84,15 +88,13 @@ def murmur2(key):

# Handle the last few bytes of the input array
extra_bytes = length % 4
if extra_bytes == 3:
if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff

if extra_bytes == 2:
if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff

if extra_bytes == 1:
if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h *= m
Expand Down
23 changes: 23 additions & 0 deletions test/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import six
from . import unittest

from kafka.partitioner import (Murmur2Partitioner)

class TestMurmurPartitioner(unittest.TestCase):
def test_hash_bytes(self):
p = Murmur2Partitioner(range(1000))
self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))

def test_hash_encoding(self):
p = Murmur2Partitioner(range(1000))
self.assertEqual(p.partition('test'), p.partition(u'test'))

def test_murmur2_java_compatibility(self):
p = Murmur2Partitioner(range(1000))
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
self.assertEqual(681, p.partition(b''))
self.assertEqual(524, p.partition(b'a'))
self.assertEqual(434, p.partition(b'ab'))
self.assertEqual(107, p.partition(b'abc'))
self.assertEqual(566, p.partition(b'123456789'))
self.assertEqual(742, p.partition(b'\x00 '))