From dc4431c964b45878e8bf3cf5bfa13c7af65bf24d Mon Sep 17 00:00:00 2001 From: Ali-Akber Saifee Date: Tue, 24 Mar 2015 06:49:55 +0800 Subject: [PATCH 1/2] Add test to demonstrate mp resume behavior mp consumer does not resume from the last comitted offset. --- test/test_consumer_integration.py | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9c8919011..7a3497250 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -326,6 +326,41 @@ def test_offset_behavior__resuming_behavior(self): consumer1.stop() consumer2.stop() + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_multi_process_offset_behavior__resuming_behavior(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # Grab the first 195 messages + output_msgs1 = [] + idx = 0 + for message in consumer1: + if idx == 195: + break + idx +=1 + output_msgs1.append(message.message.value) + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # 181-200 + self.assert_message_count([ message for message in consumer2 ], 20) + + consumer1.stop() + consumer2.stop() + # TODO: Make this a unit test -- should not require integration @kafka_versions("all") def test_fetch_buffer_size(self): From 25937c4ac450381f7a9b0ae2847b103010965cbb Mon Sep 17 00:00:00 2001 From: Ali-Akber Saifee Date: Tue, 24 Mar 2015 06:56:50 +0800 Subject: [PATCH 2/2] Allow MP Consumers to request offset in child consumers If auto_commit is set on the mp consumers or an explicit start_from_beginning is set to False, the child workers will request the last known offsets as well. --- kafka/consumer/base.py | 5 +++-- kafka/consumer/multiprocess.py | 17 +++++++++++++---- kafka/consumer/simple.py | 6 ++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 9cdcf891e..e54031c9c 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -40,7 +40,8 @@ class Consumer(object): """ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + start_from_beginning=True): self.client = client self.topic = topic @@ -67,7 +68,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.commit) self.commit_timer.start() - if auto_commit: + if auto_commit or not start_from_beginning: self.fetch_last_known_offsets(partitions) else: for partition in partitions: diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index bec3100ae..f71b2eb20 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -18,7 +18,7 @@ log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, chunk, start_from_beginning, queue, start, exit, pause, size): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -37,7 +37,8 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): partitions=chunk, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + start_from_beginning=start_from_beginning) # Ensure that the consumer provides the partition information consumer.provide_partition_info() @@ -105,7 +106,7 @@ class MultiProcessConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_procs=1, partitions_per_proc=0, start_from_beginning=True): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( @@ -113,7 +114,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + auto_commit_every_t=auto_commit_every_t, + start_from_beginning=start_from_beginning) # Variables for managing and controlling the data flow from # consumer child process to master @@ -141,10 +143,17 @@ def __init__(self, client, group, topic, auto_commit=True, # The final set of chunks chunks = [partitions[proc::num_procs] for proc in range(num_procs)] + # though the child workers will not auto commit, they should + # not start from offset = 0 if it is set. + child_start_from_beginning = start_from_beginning + if auto_commit: + child_start_from_beginning = False + self.procs = [] for chunk in chunks: args = (client.copy(), group, topic, chunk, + child_start_from_beginning, self.queue, self.start, self.exit, self.pause, self.size) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 3d250eaf2..d7604dc5a 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -114,13 +114,15 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None, - auto_offset_reset='largest'): + auto_offset_reset='largest', + start_from_beginning=True): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + auto_commit_every_t=auto_commit_every_t, + start_from_beginning=start_from_beginning) if max_buffer_size is not None and buffer_size > max_buffer_size: raise ValueError("buffer_size (%d) is greater than "