Skip to content

Ensure that multiprocess consumer works in windows #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 9, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 63 additions & 53 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,63 @@ def __iter_partition__(self, partition, offset):
offset = next_offset + 1


def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
A child process worker which consumes messages based on the
notifications given by the controller process

NOTE: Ideally, this should have been a method inside the Consumer
class. However, multiprocessing module has issues in windows. The
functionality breaks unless this function is kept outside of a class
"""

# Make the child processes open separate socket connections
client.reinit()

# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
partitions=chunk,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None)

# Ensure that the consumer provides the partition information
consumer.provide_partition_info()

while True:
# Wait till the controller indicates us to start consumption
start.wait()

# If we are asked to quit, do so
if exit.is_set():
break

# Consume messages and add them to the queue. If the controller
# indicates a specific number of messages, follow that advice
count = 0

for partition, message in consumer:
queue.put((partition, message))
count += 1

# We have reached the required size. The controller might have
# more than what he needs. Wait for a while.
# Without this logic, it is possible that we run into a big
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
pause.wait()
break

# In case we did not receive any message, give up the CPU for
# a while before we try again
if count == 0:
time.sleep(0.1)

consumer.stop()


class MultiProcessConsumer(Consumer):
"""
A consumer implementation that consumes partitions for a topic in
Expand Down Expand Up @@ -468,63 +525,16 @@ def __init__(self, client, group, topic, auto_commit=True,
self.procs = []
for chunk in chunks:
chunk = filter(lambda x: x is not None, chunk)
proc = Process(target=self._consume, args=(chunk,))
args = (client.copy(),
group, topic, chunk,
self.queue, self.start, self.exit,
self.pause, self.size)

proc = Process(target=_mp_consume, args=args)
proc.daemon = True
proc.start()
self.procs.append(proc)

def _consume(self, partitions):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
"""

# Make the child processes open separate socket connections
self.client.reinit()

# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(self.client, self.group, self.topic,
partitions=partitions,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None)

# Ensure that the consumer provides the partition information
consumer.provide_partition_info()

while True:
# Wait till the controller indicates us to start consumption
self.start.wait()

# If we are asked to quit, do so
if self.exit.is_set():
break

# Consume messages and add them to the queue. If the controller
# indicates a specific number of messages, follow that advice
count = 0

for partition, message in consumer:
self.queue.put((partition, message))
count += 1

# We have reached the required size. The controller might have
# more than what he needs. Wait for a while.
# Without this logic, it is possible that we run into a big
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == self.size.value:
self.pause.wait()
break

# In case we did not receive any message, give up the CPU for
# a while before we try again
if count == 0:
time.sleep(0.1)

consumer.stop()

def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()
Expand Down