Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def recv(self, request_id):
"""
log.debug("Reading response %d from Kafka" % request_id)

# Make sure we have a connection
if not self._sock:
self.reinit()

# Read the size off of the header
resp = self._read_bytes(4)
(size,) = struct.unpack('>i', resp)
Expand Down
1 change: 1 addition & 0 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1

MAX_BACKOFF_SECONDS = 60

class Consumer(object):
"""
Expand Down
104 changes: 58 additions & 46 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from queue import Empty, Full # python 2
import time

from ..common import KafkaError
from .base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
FULL_QUEUE_WAIT_TIME_SECONDS
FULL_QUEUE_WAIT_TIME_SECONDS,
MAX_BACKOFF_SECONDS,
)
from .simple import SimpleConsumer

Expand All @@ -33,57 +35,67 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
functionality breaks unless this function is kept outside of a class
"""

# Make the child processes open separate socket connections
client.reinit()
# Initial interval for retries in seconds.
interval = 1
while not events.exit.is_set():
try:
# Make the child processes open separate socket connections
client.reinit()

# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None,
**consumer_options)
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
auto_commit=False,
auto_commit_every_n=None,
auto_commit_every_t=None,
**consumer_options)

# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
# Ensure that the consumer provides the partition information
consumer.provide_partition_info()

while True:
# Wait till the controller indicates us to start consumption
events.start.wait()

# If we are asked to quit, do so
if events.exit.is_set():
break

# Consume messages and add them to the queue. If the controller
# indicates a specific number of messages, follow that advice
count = 0

message = consumer.get_message()
if message:
while True:
try:
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except Full:
if events.exit.is_set(): break
# Wait till the controller indicates us to start consumption
events.start.wait()

count += 1

# We have reached the required size. The controller might have
# more than what he needs. Wait for a while.
# Without this logic, it is possible that we run into a big
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
events.pause.wait()

else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
# If we are asked to quit, do so
if events.exit.is_set():
break

consumer.stop()
# Consume messages and add them to the queue. If the controller
# indicates a specific number of messages, follow that advice
count = 0

message = consumer.get_message()
if message:
while True:
try:
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
break
except Full:
if events.exit.is_set(): break

count += 1

# We have reached the required size. The controller might have
# more than what he needs. Wait for a while.
# Without this logic, it is possible that we run into a big
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
events.pause.wait()

else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)

consumer.stop()

except KafkaError as e:
# Retry with exponential backoff
log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS


class MultiProcessConsumer(Consumer):
Expand Down