Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #156 from Parsely/feature/standardize
Browse files Browse the repository at this point in the history
Feature/standardize
  • Loading branch information
kbourgoin committed Apr 26, 2015
2 parents 282fd25 + a8e246e commit e09113e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ integration:
python setup.py nosetests --attr=integration

test:
python setup.py nosetests --attr=!fixme
python setup.py nosetests

vendor:
make -C vendor
Expand Down
21 changes: 11 additions & 10 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,28 +141,29 @@ def _get_partition_msgs(partition_id, req):
response = broker.produce_messages(req)

# Figure out if we need to retry any messages
# TODO: Convert to using utils.handle_partition_responses
to_retry = []
for topic, partitions in response.topics.iteritems():
for partition, (error, offset) in partitions.iteritems():
if error == 0:
for partition, presponse in partitions.iteritems():
if presponse.err == 0:
continue # All's well
if error == UnknownTopicOrPartition.ERROR_CODE:
logger.warning('Unknown topic: %s or partition: %s. Retrying.',
self._topic, partition)
elif error == NotLeaderForPartition.ERROR_CODE:
if presponse.err == UnknownTopicOrPartition.ERROR_CODE:
logger.warning('Unknown topic: %s or partition: %s. '
'Retrying.', topic, partition)
elif presponse.err == NotLeaderForPartition.ERROR_CODE:
logger.warning('Partition leader for %s/%s changed. '
'Retrying.', topic, partition)
# Update cluster metadata to get new leader
self._cluster.update()
elif error == RequestTimedOut.ERROR_CODE:
elif presponse.err == RequestTimedOut.ERROR_CODE:
logger.warning('Produce request to %s:%s timed out. '
'Retrying.', broker.host, broker.port)
elif error == InvalidMessageError.ERROR_CODE:
elif presponse.err == InvalidMessageError.ERROR_CODE:
logger.warning('Encountered InvalidMessageError')
elif error == InvalidMessageSize.ERROR_CODE:
elif presponse.err == InvalidMessageSize.ERROR_CODE:
logger.warning('Encountered InvalidMessageSize')
continue
elif error == MessageSizeTooLarge.ERROR_CODE:
elif pres == MessageSizeTooLarge.ERROR_CODE:
logger.warning('Encountered MessageSizeTooLarge')
continue
to_retry.extend(_get_partition_msgs(partition, req))
Expand Down
19 changes: 13 additions & 6 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
The implementation has been done with an attempt to minimize memory
allocations in order to improve performance. With the exception of
compressed messages, we can calculate the size of the entire message
to send and do only a single allocation.
to send and do only a single memory allocation.
For Reference:
Expand Down Expand Up @@ -489,6 +489,12 @@ def message_count(self):
return self._message_count


ProducePartitionResponse = namedtuple(
'ProducePartitionResponse',
['err', 'offset']
)


class ProduceResponse(Response):
"""Produce Response. Checks to make sure everything went okay.
Expand All @@ -511,7 +517,8 @@ def __init__(self, buff):
for (topic, partitions) in response:
self.topics[topic] = {}
for partition in partitions:
self.topics[topic][partition[0]] = tuple(partition[1:3])
pres = ProducePartitionResponse(partition[1], partition[2])
self.topics[topic][partition[0]] = pres


##
Expand Down Expand Up @@ -622,7 +629,7 @@ def get_bytes(self):

FetchPartitionResponse = namedtuple(
'FetchPartitionResponse',
['max_offset', 'messages', 'error']
['max_offset', 'messages', 'err']
)


Expand Down Expand Up @@ -751,7 +758,7 @@ def get_bytes(self):

OffsetPartitionResponse = namedtuple(
'OffsetPartitionResponse',
['offset', 'error']
['offset', 'err']
)


Expand Down Expand Up @@ -952,7 +959,7 @@ def get_bytes(self):

OffsetCommitPartitionResponse = namedtuple(
'OffsetCommitPartitionResponse',
['error']
['err']
)


Expand Down Expand Up @@ -1059,7 +1066,7 @@ def get_bytes(self):

OffsetFetchPartitionResponse = namedtuple(
'OffsetFetchPartitionResponse',
['offset', 'metadata', 'error']
['offset', 'metadata', 'err']
)


Expand Down
2 changes: 1 addition & 1 deletion pykafka/utils/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def handle_partition_responses(response,
owned_partition = None
if partitions_by_id is not None:
owned_partition = partitions_by_id[partition_id]
parts_by_error[pres.error].append((owned_partition, pres))
parts_by_error[pres.err].append((owned_partition, pres))

for errcode, parts in parts_by_error.iteritems():
if errcode in error_handlers:
Expand Down
13 changes: 8 additions & 5 deletions tests/pykafka/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ def test_partition_error(self):
response = protocol.ProduceResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02')
)
self.assertEqual(response.topics['test'][0][0], 3)
self.assertEqual(response.topics['test'][0].err, 3)

def test_response(self):
response = protocol.ProduceResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02')
)
self.assertEqual(response.topics, {'test': {0: (0, 2)}})
self.assertEqual(
response.topics,
{'test': {0: protocol.ProducePartitionResponse(0, 2)}}
)


class TestFetchAPI(unittest2.TestCase):
Expand All @@ -100,7 +103,7 @@ def test_partition_error(self):
response = protocol.FetchResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message')
)
self.assertEqual(response.topics['test'][0].error, 3)
self.assertEqual(response.topics['test'][0].err, 3)

def test_response(self):
resp = protocol.FetchResponse(
Expand Down Expand Up @@ -163,7 +166,7 @@ def test_partition_error(self):
response = protocol.OffsetResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02')
)
self.assertEqual(response.topics['test'][0].error, 3)
self.assertEqual(response.topics['test'][0].err, 3)

def test_response(self):
resp = protocol.OffsetResponse(
Expand Down Expand Up @@ -203,7 +206,7 @@ def test_offset_commit_response(self):
response = protocol.OffsetCommitResponse(
buffer('\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00')
)
self.assertEqual(response.topics['emmett.dummy'][0].error, 0)
self.assertEqual(response.topics['emmett.dummy'][0].err, 0)

def test_offset_fetch_request(self):
preq = protocol.PartitionOffsetFetchRequest('testtopic', 0)
Expand Down
12 changes: 6 additions & 6 deletions tests/pykafka/test_simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class TestOwnedPartition(unittest2.TestCase):
def test_partition_saves_offset(self):
msgval = "test"
op = OwnedPartition(None, None)
op = OwnedPartition(None)

message = mock.Mock()
message.value = msgval
Expand All @@ -24,7 +24,7 @@ def test_partition_saves_offset(self):

def test_partition_rejects_old_message(self):
last_offset = 400
op = OwnedPartition(None, None)
op = OwnedPartition(None)
op.last_offset_consumed = last_offset

message = mock.Mock()
Expand All @@ -37,7 +37,7 @@ def test_partition_rejects_old_message(self):
self.assertEqual(op.last_offset_consumed, last_offset)

def test_partition_consume_empty_queue(self):
op = OwnedPartition(None, None)
op = OwnedPartition(None)

message = op.consume()
self.assertEqual(message, None)
Expand All @@ -49,7 +49,7 @@ def test_partition_offset_commit_request(self):
partition.topic = topic
partition.id = 12345

op = OwnedPartition(partition, None)
op = OwnedPartition(partition)
op.last_offset_consumed = 200

rqtime = int(time.time())
Expand All @@ -69,7 +69,7 @@ def test_partition_offset_fetch_request(self):
partition.topic = topic
partition.id = 12345

op = OwnedPartition(partition, None)
op = OwnedPartition(partition)

request = op.build_offset_fetch_request()

Expand All @@ -80,7 +80,7 @@ def test_partition_offset_counters(self):
res = mock.Mock()
res.offset = 400

op = OwnedPartition(None, None)
op = OwnedPartition(None)
op.set_offset(res.offset)

self.assertEqual(op.last_offset_consumed, res.offset)
Expand Down

0 comments on commit e09113e

Please sign in to comment.