Skip to content

Commit

Permalink
Merge pull request #143 from aio-libs/pattern_without_group_id
Browse files Browse the repository at this point in the history
Added metadata change listener if group_id is None
  • Loading branch information
tvoinarovskyi committed Apr 16, 2017
2 parents 8d6d4f8 + 0e39ec7 commit ff22a12
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 79 deletions.
35 changes: 16 additions & 19 deletions aiokafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from kafka.consumer.subscription_state import SubscriptionState

from aiokafka.client import AIOKafkaClient
from aiokafka.group_coordinator import GroupCoordinator
from aiokafka.group_coordinator import GroupCoordinator, NoGroupCoordinator
from aiokafka.errors import ConsumerStoppedError
from aiokafka.fetcher import Fetcher
from aiokafka import __version__, ensure_future, PY_35
Expand Down Expand Up @@ -230,25 +230,22 @@ def start(self):
enable_auto_commit=self._enable_auto_commit,
auto_commit_interval_ms=self._auto_commit_interval_ms,
assignors=self._partition_assignment_strategy,
exclude_internal_topics=self._exclude_internal_topics)
self._coordinator.on_group_rebalanced(
self._on_change_subscription)
exclude_internal_topics=self._exclude_internal_topics,
assignment_changed_cb=self._on_change_subscription)

yield from self._coordinator.ensure_active_group()
elif self._subscription.needs_partition_assignment:
# using manual partitions assignment by topic(s)
yield from self._client.force_metadata_update()
partitions = []
for topic in self._subscription.subscription:
p_ids = self.partitions_for_topic(topic)
if not p_ids:
raise UnknownTopicOrPartitionError()
for p_id in p_ids:
partitions.append(TopicPartition(topic, p_id))
self._subscription.unsubscribe()
self._subscription.assign_from_user(partitions)
yield from self._update_fetch_positions(
self._subscription.missing_fetch_positions())
else:
# Using a simple assignment coordinator for reassignment on
# metadata changes
self._coordinator = NoGroupCoordinator(
self._client, self._subscription, loop=self._loop,
exclude_internal_topics=self._exclude_internal_topics,
assignment_changed_cb=self._on_change_subscription)

# If we passed `topics` to constructor.
if self._subscription.needs_partition_assignment:
yield from self._client.force_metadata_update()
self._coordinator.assign_all_partitions(check_unknown=True)

@asyncio.coroutine
def _wait_topics(self):
Expand Down Expand Up @@ -567,7 +564,7 @@ def subscription(self):
Returns:
set: {topic, ...}
"""
return self._subscription.subscription
return frozenset(self._subscription.subscription or [])

def unsubscribe(self):
"""Unsubscribe from all topics and clear all assigned partitions."""
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ def update_fetch_positions(self, partitions):
done, _ = yield from asyncio.wait(
futures, return_when=asyncio.ALL_COMPLETED, loop=self._loop)
# retrieve task result, can raise exception
[x.result() for x in done]
for x in done:
x.result()

@asyncio.coroutine
def _reset_offset(self, partition):
Expand Down
177 changes: 121 additions & 56 deletions aiokafka/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,109 @@
log = logging.getLogger(__name__)


class GroupCoordinator(object):
class BaseCoordinator(object):

def __init__(self, client, subscription, *, loop,
exclude_internal_topics=True,
assignment_changed_cb=None
):
self.loop = loop
self._client = client
self._exclude_internal_topics = exclude_internal_topics
self._subscription = subscription
self._assignment_changed_cb = assignment_changed_cb

self._metadata_snapshot = {} # Is updated by metadata listener
self._assignment_snapshot = None # Is only present on Leader consumer
self._cluster = client.cluster

# update initial subscription state using currently known metadata
self._handle_metadata_update(self._cluster)
self._cluster.add_listener(self._handle_metadata_update)

def _handle_metadata_update(self, cluster):
if self._subscription.subscribed_pattern:
topics = []
for topic in cluster.topics(self._exclude_internal_topics):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)

self._subscription.change_subscription(topics)

if self._subscription.partitions_auto_assigned():
metadata_snapshot = self._get_metadata_snapshot()
if self._metadata_snapshot != metadata_snapshot:
self._metadata_snapshot = metadata_snapshot

if self._metadata_changed():
log.debug("Metadata for topic has changed from %s to %s. ",
self._assignment_snapshot, metadata_snapshot)

def _metadata_changed(self):
return (
self._assignment_snapshot is not None and
self._assignment_snapshot != self._metadata_snapshot)

def _get_metadata_snapshot(self):
partitions_per_topic = {}
for topic in self._subscription.group_subscription():
partitions = self._cluster.partitions_for_topic(topic) or []
# Partitions are always from 0 to N, so no reason to check each
# partition separately, only length is enough
partitions_per_topic[topic] = len(partitions)
return partitions_per_topic

def _on_change_assignment(self):
if self._assignment_changed_cb is not None:
self._assignment_changed_cb()


class NoGroupCoordinator(BaseCoordinator):
"""
When `group_id` consumer option is not used we don't have the functionality
provided by Coordinator node in Kafka cluster, like committing offsets (
Kafka based offset storage) or automatic partition assignment between
consumers. But `GroupCoordinator` class has some other responsibilities,
that this class takes care of to avoid code duplication, like:
* Static topic partition assignment when we subscribed to topic.
Partition changes will be noticed by metadata update and assigned.
* Pattern topic subscription. New topics will be noticed by metadata
update and added to subscription.
"""

def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self._assignment_snapshot = {}

def _handle_metadata_update(self, cluster):
super()._handle_metadata_update(cluster)
if self._metadata_changed():
self.assign_all_partitions()

def assign_all_partitions(self, check_unknown=False):
""" Assign all partitions from subscribed topics to this consumer.
If `check_unknown` we will raise UnknownTopicOrPartitionError if
subscribed topic is not found in metadata response.
"""
partitions = []
for topic in self._subscription.subscription:
p_ids = self._cluster.partitions_for_topic(topic)
if not p_ids and check_unknown:
raise Errors.UnknownTopicOrPartitionError()
for p_id in p_ids:
partitions.append(TopicPartition(topic, p_id))
self._subscription.assign_from_subscribed(partitions)

self._assignment_snapshot = self._metadata_snapshot
self._on_change_assignment()

@asyncio.coroutine
def close(self):
pass


class GroupCoordinator(BaseCoordinator):
"""
GroupCoordinator implements group management for single group member
by interacting with a designated Kafka broker (the coordinator). Group
Expand Down Expand Up @@ -62,7 +164,8 @@ def __init__(self, client, subscription, *, loop,
retry_backoff_ms=100,
enable_auto_commit=True, auto_commit_interval_ms=5000,
assignors=(RoundRobinPartitionAssignor,),
exclude_internal_topics=True
exclude_internal_topics=True,
assignment_changed_cb=None
):
"""Initialize the coordination manager.
Expand Down Expand Up @@ -94,35 +197,31 @@ def __init__(self, client, subscription, *, loop,
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
self._client = client
super().__init__(
client, subscription, loop=loop,
exclude_internal_topics=exclude_internal_topics,
assignment_changed_cb=assignment_changed_cb)

self._session_timeout_ms = session_timeout_ms
self._heartbeat_interval_ms = heartbeat_interval_ms
self._retry_backoff_ms = retry_backoff_ms
self._exclude_internal_topics = exclude_internal_topics
self._assignors = assignors
self._enable_auto_commit = enable_auto_commit
self._auto_commit_interval_ms = auto_commit_interval_ms

self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
self.group_id = group_id
self.coordinator_id = None
self.rejoin_needed = True
self.needs_join_prepare = True
self.loop = loop
# rejoin group can be called in parallel
# `ensure_active_group` can be called from several places
# (from consumer and from heartbeat task), so we need lock
self._rejoin_lock = asyncio.Lock(loop=loop)
self._enable_auto_commit = enable_auto_commit
self._auto_commit_interval_ms = auto_commit_interval_ms
self._assignors = assignors
self._subscription = subscription
self._metadata_snapshot = {} # Is updated by metadata listener
self._assignment_snapshot = None # Is only present on Leader consumer
self._cluster = client.cluster
self._auto_commit_task = None

# _closing future used as a signal to heartbeat task for finish ASAP
self._closing = asyncio.Future(loop=loop)
# update subscribe state usint currently known metadata
self._handle_metadata_update(client.cluster)
self._cluster.add_listener(self._handle_metadata_update)
self._group_rebalanced_callback = None

self.heartbeat_task = ensure_future(
self._heartbeat_task_routine(), loop=loop)
Expand Down Expand Up @@ -160,9 +259,6 @@ def close(self):
else:
log.info("LeaveGroup request succeeded")

def on_group_rebalanced(self, callback):
self._group_rebalanced_callback = callback

@asyncio.coroutine
def _send_req(self, node_id, request):
"""send request to Kafka node and mark coordinator as `dead`
Expand All @@ -185,33 +281,6 @@ def _send_req(self, node_id, request):
else:
raise error_type()

def _handle_metadata_update(self, cluster):
if self._subscription.subscribed_pattern:
topics = []
for topic in cluster.topics(self._exclude_internal_topics):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)

self._subscription.change_subscription(topics)

if self._subscription.partitions_auto_assigned():
metadata_snapshot = self._get_metadata_snapshot()
if self._metadata_snapshot != metadata_snapshot:
self._metadata_snapshot = metadata_snapshot

if self._metadata_changed():
log.debug("Metadata for topic has changed from %s to %s. ",
self._assignment_snapshot, metadata_snapshot)

def _get_metadata_snapshot(self):
partitions_per_topic = {}
for topic in self._subscription.group_subscription():
partitions = self._cluster.partitions_for_topic(topic) or []
# Partitions are always from 0 to N, so no reason to check each
# partition separately, only length is enough
partitions_per_topic[topic] = len(partitions)
return partitions_per_topic

def _lookup_assignor(self, name):
for assignor in self._assignors:
if assignor.name == name:
Expand Down Expand Up @@ -268,8 +337,10 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
log.debug("Finished assignment for group %s: %s",
self.group_id, assignments)

# If we remove topics from subscription metadata is not refreshed, so
# the snapshot can be outdated too.
# `group_subscribe()` will not trigger a metadata update if we only
# removed some topics (client has all needed metadata already).
# But in that case the snapshot could be outdated, so we update
# `_metadata_snapshot` too.
self._assignment_snapshot = self._metadata_snapshot = \
self._get_metadata_snapshot()

Expand Down Expand Up @@ -310,8 +381,7 @@ def _on_join_complete(self, generation, member_id, protocol,
self._subscription.listener, self.group_id,
assigned)

if self._group_rebalanced_callback:
self._group_rebalanced_callback()
self._on_change_assignment()

@asyncio.coroutine
def refresh_committed_offsets(self):
Expand Down Expand Up @@ -611,11 +681,6 @@ def need_rejoin(self):
self._subscription.needs_partition_assignment or
self._metadata_changed()))

def _metadata_changed(self):
return (
self._assignment_snapshot is not None and
self._assignment_snapshot != self._metadata_snapshot)

@asyncio.coroutine
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
Expand Down
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def pytest_addoption(parser):
action='store',
default='pygo/kafka:2.11_0.9.0.1',
help='Kafka docker image to use')
parser.addoption('--no-pull',
action='store_true',
help='Do not pull new docker image before test run')


@pytest.fixture(scope='session')
Expand Down Expand Up @@ -91,7 +94,8 @@ def session_id():
def kafka_server(request, docker, docker_ip_address,
unused_port, session_id, ssl_folder):
image = request.config.getoption('--docker-image')
docker.pull(image)
if not request.config.getoption('--no-pull'):
docker.pull(image)
kafka_host = docker_ip_address
kafka_port = unused_port()
kafka_ssl_port = unused_port()
Expand Down
Loading

0 comments on commit ff22a12

Please sign in to comment.