Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Fix tests for protocol.py
Browse files Browse the repository at this point in the history
  • Loading branch information
kbourgoin committed Apr 24, 2015
1 parent 1a05baf commit 9a291c6
Showing 1 changed file with 21 additions and 26 deletions.
47 changes: 21 additions & 26 deletions tests/pykafka/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,18 @@ def test_response(self):
[cluster.brokers[0].id])

def test_partition_error(self):
self.assertRaises(
exceptions.UnknownTopicOrPartition,
lambda: protocol.MetadataResponse(
buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00')
)
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.MetadataResponse(
buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00')
)
self.assertEqual(response.topics['test'].partitions[0].err, 3)

def test_topic_error(self):
self.assertRaises(
exceptions.UnknownTopicOrPartition,
lambda: protocol.MetadataResponse(
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.MetadataResponse(
buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x03\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00')
)
)
self.assertEqual(response.topics['test'].err, 3)


class TestProduceAPI(unittest2.TestCase):
Expand Down Expand Up @@ -74,18 +72,17 @@ def test_snappy_compression(self):
self.assertEqual(len(msg), 212) # this isn't a good test

def test_partition_error(self):
self.assertRaises(
exceptions.UnknownTopicOrPartition,
lambda: protocol.ProduceResponse(
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.ProduceResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02')
)
)
self.assertEqual(response.topics['test'][0][0], 3)

def test_response(self):
response = protocol.ProduceResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02')
)
self.assertEqual(response.topics, {'test': {0: 2}})
self.assertEqual(response.topics, {'test': {0: (0, 2)}})


class TestFetchAPI(unittest2.TestCase):
Expand All @@ -99,12 +96,11 @@ def test_request(self):
)

def test_partition_error(self):
self.assertRaises(
exceptions.UnknownTopicOrPartition,
lambda: protocol.FetchResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message')
)
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.FetchResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message')
)
self.assertEqual(response.topics['test'][0].error, 3)

def test_response(self):
resp = protocol.FetchResponse(
Expand Down Expand Up @@ -163,18 +159,17 @@ def test_request(self):
)

def test_partition_error(self):
self.assertRaises(
exceptions.UnknownTopicOrPartition,
lambda: protocol.OffsetResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02')
)
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.OffsetResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02')
)
self.assertEqual(response.topics['test'][0].error, 3)

def test_response(self):
resp = protocol.OffsetResponse(
buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02')
)
self.assertEqual(resp.topics['test'], {0: [2]})
self.assertEqual(resp.topics['test'][0].offset, [2])


class TestOffsetCommitFetchAPI(unittest2.TestCase):
Expand Down Expand Up @@ -208,7 +203,7 @@ def test_offset_commit_response(self):
response = protocol.OffsetCommitResponse(
buffer('\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00')
)
self.assertEqual(response.topics['emmett.dummy'][0], 0)
self.assertEqual(response.topics['emmett.dummy'][0].error, 0)

def test_offset_fetch_request(self):
preq = protocol.PartitionOffsetFetchRequest('testtopic', 0)
Expand Down

0 comments on commit 9a291c6

Please sign in to comment.