Skip to content

Commit

Permalink
Fixed tests and other issues after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
vshlapakov committed Mar 24, 2015
1 parent 513c527 commit 8f45421
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 31 deletions.
2 changes: 1 addition & 1 deletion kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
acks=req_acks,
timeout=ack_timeout)
except FailedPayloadsError as ex:
failed_reqs = ex.args[0]
failed_reqs = ex.failed_payloads
log.exception("Failed payloads count %s" % len(failed_reqs))

# if no limit, retry all failed messages until success
Expand Down
49 changes: 19 additions & 30 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
from kafka.protocol import CODEC_NONE

import threading
import multiprocessing as mp
try:
from queue import Empty
from queue import Empty, Queue
except ImportError:
from Queue import Empty
from Queue import Empty, Queue


class TestKafkaProducer(unittest.TestCase):
Expand Down Expand Up @@ -56,33 +55,26 @@ def partitions(topic):
class TestKafkaProducerSendUpstream(unittest.TestCase):

def setUp(self):

# create a multiprocessing Value to store call counter
# (magicmock counters don't work with other processes)
self.send_calls_count = mp.Value('i', 0)

def send_side_effect(*args, **kwargs):
self.send_calls_count.value += 1

self.client = MagicMock()
self.client.send_produce_request.side_effect = send_side_effect
self.queue = mp.Queue()
self.queue = Queue()

def _run_process(self, retries_limit=3, sleep_timeout=1):
# run _send_upstream process with the queue
self.process = mp.Process(
stop_event = threading.Event()
self.thread = threading.Thread(
target=_send_upstream,
args=(self.queue, self.client, CODEC_NONE,
0.3, # batch time (seconds)
3, # batch length
Producer.ACK_AFTER_LOCAL_WRITE,
Producer.DEFAULT_ACK_TIMEOUT,
50, # retry backoff (ms)
retries_limit))
self.process.daemon = True
self.process.start()
retries_limit,
stop_event))
self.thread.daemon = True
self.thread.start()
time.sleep(sleep_timeout)
self.process.terminate()
stop_event.set()

def test_wo_retries(self):

Expand All @@ -97,7 +89,8 @@ def test_wo_retries(self):

# there should be 4 non-void cals:
# 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.send_calls_count.value, 4)
self.assertEqual(self.client.send_produce_request.call_count, 4)


def test_first_send_failed(self):

Expand All @@ -106,11 +99,10 @@ def test_first_send_failed(self):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))

is_first_time = mp.Value('b', True)
self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
self.send_calls_count.value += 1
if is_first_time.value:
is_first_time.value = False
if self.client.is_first_time:
self.client.is_first_time = False
raise FailedPayloadsError(reqs)

self.client.send_produce_request.side_effect = send_side_effect
Expand All @@ -122,7 +114,7 @@ def send_side_effect(reqs, *args, **kwargs):

# there should be 5 non-void cals: 1st failed batch of 3 msgs
# + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
self.assertEqual(self.send_calls_count.value, 5)
self.assertEqual(self.client.send_produce_request.call_count, 5)

def test_with_limited_retries(self):

Expand All @@ -132,7 +124,6 @@ def test_with_limited_retries(self):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))

def send_side_effect(reqs, *args, **kwargs):
self.send_calls_count.value += 1
raise FailedPayloadsError(reqs)

self.client.send_produce_request.side_effect = send_side_effect
Expand All @@ -145,8 +136,7 @@ def send_side_effect(reqs, *args, **kwargs):
# there should be 16 non-void cals:
# 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
# 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
self.assertEqual(self.send_calls_count.value, 16)

self.assertEqual(self.client.send_produce_request.call_count, 16)

def test_with_unlimited_retries(self):

Expand All @@ -156,7 +146,6 @@ def test_with_unlimited_retries(self):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))

def send_side_effect(reqs, *args, **kwargs):
self.send_calls_count.value += 1
raise FailedPayloadsError(reqs)

self.client.send_produce_request.side_effect = send_side_effect
Expand All @@ -174,5 +163,5 @@ def send_side_effect(reqs, *args, **kwargs):
self.assertEqual(self.queue.empty(), True)

# 1s / 50ms of backoff = 20 times max
self.assertTrue(self.send_calls_count.value > 10)
self.assertTrue(self.send_calls_count.value <= 20)
calls = self.client.send_produce_request.call_count
self.assertTrue(calls > 10 & calls <= 20)

0 comments on commit 8f45421

Please sign in to comment.