Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #748 from Parsely/feature/rr_assignment
Browse files Browse the repository at this point in the history
roundrobin partition assignment
  • Loading branch information
Emmett J. Butler committed Dec 4, 2017
2 parents 42b09e6 + 5bbc785 commit fb5bb06
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 82 deletions.
5 changes: 5 additions & 0 deletions doc/api/membershipprotocol.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.membershipprotocol
==========================

.. automodule:: pykafka.membershipprotocol
:members:
3 changes: 2 additions & 1 deletion pykafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from .client import KafkaClient
from .balancedconsumer import BalancedConsumer
from .managedbalancedconsumer import ManagedBalancedConsumer
from .membershipprotocol import RangeProtocol, RoundRobinProtocol

__version__ = '2.7.0.dev1'


__all__ = ["Broker", "SimpleConsumer", "Cluster", "Partition", "Producer",
"Topic", "SslConfig", "KafkaClient", "BalancedConsumer",
"ManagedBalancedConsumer"]
"ManagedBalancedConsumer", "RangeProtocol", "RoundRobinProtocol"]
56 changes: 9 additions & 47 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
limitations under the License.
"""
__all__ = ["BalancedConsumer"]
import itertools
import logging
import socket
import sys
Expand All @@ -37,6 +36,7 @@

from .common import OffsetType
from .exceptions import KafkaException, PartitionOwnedError, ConsumerStoppedException
from .membershipprotocol import RangeProtocol
from .simpleconsumer import SimpleConsumer
from .utils.compat import range, get_bytes, itervalues, iteritems, get_string
from .utils.error_handlers import valid_int
Expand Down Expand Up @@ -99,7 +99,8 @@ def __init__(self,
reset_offset_on_start=False,
post_rebalance_callback=None,
use_rdkafka=False,
compacted_topic=False):
compacted_topic=False,
membership_protocol=RangeProtocol):
"""Create a BalancedConsumer instance
:param topic: The topic this consumer should consume
Expand Down Expand Up @@ -199,6 +200,9 @@ def __init__(self,
consumer to use less stringent message ordering logic because compacted
topics do not provide offsets in strict incrementing order.
:type compacted_topic: bool
:param membership_protocol: The group membership protocol to which this consumer
should adhere
:type membership_protocol: :class:`pykafka.membershipprotocol.GroupMembershipProtocol`
"""
self._cluster = cluster
if not isinstance(consumer_group, bytes):
Expand Down Expand Up @@ -230,6 +234,7 @@ def __init__(self,
self._running = False
self._worker_exception = None
self._is_compacted_topic = compacted_topic
self._membership_protocol = membership_protocol

if not rdkafka and use_rdkafka:
raise ImportError("use_rdkafka requires rdkafka to be installed")
Expand Down Expand Up @@ -431,49 +436,6 @@ def _get_internal_consumer(self, partitions=None, start=True):
consumer_id=self._consumer_id
)

def _decide_partitions(self, participants, consumer_id=None):
"""Decide which partitions belong to this consumer.
Uses the consumer rebalancing algorithm described here
https://kafka.apache.org/documentation/#impl_consumerrebalance
It is very important that the participants array is sorted,
since this algorithm runs on each consumer and indexes into the same
array. The same array index operation must return the same
result on each consumer.
:param participants: Sorted list of ids of all other consumers in this
consumer group.
:type participants: Iterable of `bytes`
:param consumer_id: The ID of the consumer for which to generate a partition
assignment. Defaults to `self._consumer_id`
"""
# Freeze and sort partitions so we always have the same results
def p_to_str(p):
return '-'.join([str(p.topic.name), str(p.leader.id), str(p.id)])

all_parts = self._topic.partitions.values()
all_parts = sorted(all_parts, key=p_to_str)

# get start point, # of partitions, and remainder
participants = sorted(participants) # just make sure it's sorted.
consumer_id = consumer_id or self._consumer_id
idx = participants.index(consumer_id)
parts_per_consumer = len(all_parts) // len(participants)
remainder_ppc = len(all_parts) % len(participants)

start = parts_per_consumer * idx + min(idx, remainder_ppc)
num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)

# assign partitions from i*N to (i+1)*N - 1 to consumer Ci
new_partitions = itertools.islice(all_parts, start, start + num_parts)
new_partitions = set(new_partitions)
log.info('%s: Balancing %i participants for %i partitions. Owning %i partitions.',
consumer_id, len(participants), len(all_parts),
len(new_partitions))
log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions])
return new_partitions

def _get_participants(self):
"""Use zookeeper to get the other consumers of this topic.
Expand Down Expand Up @@ -577,7 +539,8 @@ def _update_member_assignment(self):
self._add_self()
participants.append(self._consumer_id)

new_partitions = self._decide_partitions(participants)
new_partitions = self._membership_protocol.decide_partitions(
participants, self._topic.partitions, self._consumer_id)
if not new_partitions:
log.warning("No partitions assigned to consumer %s",
self._consumer_id)
Expand Down Expand Up @@ -626,7 +589,6 @@ def _rebalance(self):
if self._rebalancing_in_progress.is_set():
self._rebalancing_in_progress.clear()


def _path_from_partition(self, p):
"""Given a partition, return its path in zookeeper.
Expand Down
13 changes: 11 additions & 2 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,12 @@ def fetch_consumer_group_offsets(self, consumer_group, preqs):
# Group Membership API #
##########################

def join_group(self, connection_id, consumer_group, member_id, topic_name):
def join_group(self,
connection_id,
consumer_group,
member_id,
topic_name,
membership_protocol):
"""Send a JoinGroupRequest
:param connection_id: The unique identifier of the connection on which to make
Expand All @@ -429,11 +434,15 @@ def join_group(self, connection_id, consumer_group, member_id, topic_name):
:param topic_name: The name of the topic to which to connect, used in protocol
metadata
:type topic_name: str
:param membership_protocol: The group membership protocol to which this request
should adhere
:type membership_protocol: :class:`pykafka.membershipprotocol.GroupMembershipProtocol`
"""
handler = self._get_unique_req_handler(connection_id)
if handler is None:
raise SocketDisconnectedError
future = handler.request(JoinGroupRequest(consumer_group, member_id, topic_name))
future = handler.request(JoinGroupRequest(consumer_group, member_id, topic_name,
membership_protocol))
self._handler.sleep()
return future.get(JoinGroupResponse)

Expand Down
16 changes: 12 additions & 4 deletions pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .common import OffsetType
from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup,
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress)
from .membershipprotocol import RangeProtocol
from .protocol import MemberAssignment
from .utils.compat import iterkeys
from .utils.error_handlers import valid_int
Expand Down Expand Up @@ -70,7 +71,8 @@ def __init__(self,
post_rebalance_callback=None,
use_rdkafka=False,
compacted_topic=True,
heartbeat_interval_ms=3000):
heartbeat_interval_ms=3000,
membership_protocol=RangeProtocol):
"""Create a ManagedBalancedConsumer instance
:param topic: The topic this consumer should consume
Expand Down Expand Up @@ -162,6 +164,9 @@ def __init__(self,
:param heartbeat_interval_ms: The amount of time in milliseconds to wait between
heartbeat requests
:type heartbeat_interval_ms: int
:param membership_protocol: The group membership protocol to which this consumer
should adhere
:type membership_protocol: :class:`pykafka.membershipprotocol.GroupMembershipProtocol`
"""

self._cluster = cluster
Expand All @@ -188,6 +193,8 @@ def __init__(self,
self._rebalance_backoff_ms = valid_int(rebalance_backoff_ms)
self._post_rebalance_callback = post_rebalance_callback
self._is_compacted_topic = compacted_topic
self._membership_protocol = membership_protocol
self._membership_protocol.metadata.topic_names = [self._topic.name]
self._heartbeat_interval_ms = valid_int(heartbeat_interval_ms)
if use_rdkafka is True:
raise ImportError("use_rdkafka is not available for {}".format(
Expand Down Expand Up @@ -289,8 +296,8 @@ def _update_member_assignment(self):
(member_id,
MemberAssignment([
(self._topic.name,
[p.id for p in self._decide_partitions(
iterkeys(members), consumer_id=member_id)])])
[p.id for p in self._membership_protocol.decide_partitions(
iterkeys(members), self._topic.partitions, member_id)])])
) for member_id in members]

assignment = self._sync_group(group_assignments)
Expand Down Expand Up @@ -348,7 +355,8 @@ def _join_group(self):
join_result = self._group_coordinator.join_group(self._connection_id,
self._consumer_group,
self._consumer_id,
self._topic.name)
self._topic.name,
self._membership_protocol)
if join_result.error_code == 0:
break
log.info("Error code %d encountered during JoinGroupRequest for"
Expand Down
107 changes: 107 additions & 0 deletions pykafka/membershipprotocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import itertools
import logging
from collections import namedtuple

from .protocol import ConsumerGroupProtocolMetadata

log = logging.getLogger(__name__)

GroupMembershipProtocol = namedtuple(
'GroupMembershipProtocol', ['protocol_type',
'protocol_name',
'metadata',
'decide_partitions'])


def decide_partitions_range(participants, partitions, consumer_id):
"""Decide which partitions belong to this consumer_id.
Uses the consumer rebalancing algorithm described here
https://kafka.apache.org/documentation/#impl_consumerrebalance
It is very important that the participants array is sorted,
since this algorithm runs on each consumer and indexes into the same
array. The same array index operation must return the same
result on each consumer.
:param participants: Sorted list of ids of all consumers in this
consumer group.
:type participants: Iterable of `bytes`
:param partitions: List of all partitions on the topic being consumed
:type partitions: Iterable of :class:`pykafka.partition.Partition`
:param consumer_id: The ID of the consumer for which to generate a partition
assignment.
:type consumer_id: bytes
"""
# Freeze and sort partitions so we always have the same results
def p_to_str(p):
return '-'.join([str(p.topic.name), str(p.leader.id), str(p.id)])

all_parts = sorted(partitions.values(), key=p_to_str)

# get start point, # of partitions, and remainder
participants = sorted(participants) # just make sure it's sorted.
idx = participants.index(consumer_id)
parts_per_consumer = len(all_parts) // len(participants)
remainder_ppc = len(all_parts) % len(participants)

start = parts_per_consumer * idx + min(idx, remainder_ppc)
num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)

# assign partitions from i*N to (i+1)*N - 1 to consumer Ci
new_partitions = itertools.islice(all_parts, start, start + num_parts)
new_partitions = set(new_partitions)
log.info('%s: Balancing %i participants for %i partitions. Owning %i partitions.',
consumer_id, len(participants), len(all_parts),
len(new_partitions))
log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions])
return new_partitions


RangeProtocol = GroupMembershipProtocol(b"consumer",
b"range",
ConsumerGroupProtocolMetadata(),
decide_partitions_range)


def decide_partitions_roundrobin(participants, partitions, consumer_id):
"""Decide which partitions belong to this consumer_id.
Uses the "roundrobin" strategy described here
https://kafka.apache.org/documentation/#oldconsumerconfigs
:param participants: Sorted list of ids of all consumers in this
consumer group.
:type participants: Iterable of `bytes`
:param partitions: List of all partitions on the topic being consumed
:type partitions: Iterable of :class:`pykafka.partition.Partition`
:param consumer_id: The ID of the consumer for which to generate a partition
assignment.
:type consumer_id: bytes
"""
# Freeze and sort partitions so we always have the same results
def p_to_str(p):
return '-'.join([str(p.topic.name), str(p.leader.id), str(p.id)])

partitions = sorted(partitions.values(), key=p_to_str)
participants = sorted(participants)

new_partitions = set()
partitions_idx = participants_idx = 0
for _ in range(len(partitions)):
if participants[participants_idx] == consumer_id:
new_partitions.add(partitions[partitions_idx])
partitions_idx = (partitions_idx + 1) % len(partitions)
participants_idx = (participants_idx + 1) % len(participants)

log.info('%s: Balancing %i participants for %i partitions. Owning %i partitions.',
consumer_id, len(participants), len(partitions),
len(new_partitions))
log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions])
return new_partitions


RoundRobinProtocol = GroupMembershipProtocol(b"consumer",
b"roundrobin",
ConsumerGroupProtocolMetadata(),
decide_partitions_roundrobin)
15 changes: 7 additions & 8 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,11 +1366,6 @@ def from_bytestring(cls, buff):
return cls(version, topic_names, user_data)


GroupMembershipProtocol = namedtuple(
'GroupMembershipProtocol', ['protocol_type', 'protocol_name', 'metadata']
)


class JoinGroupRequest(Request):
"""A group join request
Expand All @@ -1385,10 +1380,14 @@ class JoinGroupRequest(Request):
ProtocolName => string
ProtocolMetadata => bytes
"""
def __init__(self, group_id, member_id, topic_name, session_timeout=30000):
def __init__(self,
group_id,
member_id,
topic_name,
membership_protocol,
session_timeout=30000):
"""Create a new group join request"""
metadata = ConsumerGroupProtocolMetadata(topic_names=[topic_name])
self.protocol = GroupMembershipProtocol(b"consumer", b"range", metadata)
self.protocol = membership_protocol
self.group_id = group_id
self.session_timeout = session_timeout
self.member_id = member_id
Expand Down

0 comments on commit fb5bb06

Please sign in to comment.