Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #760 from Parsely/feature/unicode
Browse files Browse the repository at this point in the history
encode consumer groups and topic names as ascii bytestrings
  • Loading branch information
Emmett J. Butler committed Feb 21, 2018
2 parents e71058f + 9c2e356 commit 4bec6cf
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ coverage.xml
.cache
*.so
.rnd
.pytest_cache
13 changes: 8 additions & 5 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
from six import reraise

from .common import OffsetType
from .exceptions import KafkaException, PartitionOwnedError, ConsumerStoppedException
from .exceptions import (KafkaException, PartitionOwnedError, ConsumerStoppedException,
UnicodeException)
from .membershipprotocol import RangeProtocol
from .simpleconsumer import SimpleConsumer
from .utils.compat import range, get_bytes, itervalues, iteritems, get_string
Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(self,
should join. Consumer group names are namespaced at the cluster level,
meaning that two consumers consuming different topics with the same group name
will be treated as part of the same group.
:type consumer_group: bytes
:type consumer_group: str
:param fetch_message_max_bytes: The number of bytes of messages to
attempt to fetch with each fetch request
:type fetch_message_max_bytes: int
Expand Down Expand Up @@ -205,9 +206,11 @@ def __init__(self,
:type membership_protocol: :class:`pykafka.membershipprotocol.GroupMembershipProtocol`
"""
self._cluster = cluster
if not isinstance(consumer_group, bytes):
raise TypeError("consumer_group must be a bytes object")
self._consumer_group = consumer_group
try:
self._consumer_group = get_string(consumer_group).encode('ascii')
except UnicodeEncodeError:
raise UnicodeException("Consumer group name '{}' contains non-ascii "
"characters".format(consumer_group))
self._topic = topic

self._auto_commit_enable = auto_commit_enable
Expand Down
13 changes: 8 additions & 5 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
NoBrokersAvailableError,
SocketDisconnectedError,
LeaderNotFoundError,
LeaderNotAvailable)
LeaderNotAvailable,
UnicodeException)
from .protocol import (GroupCoordinatorRequest, GroupCoordinatorResponse,
API_VERSIONS_090, API_VERSIONS_080)
from .topic import Topic
from .utils.compat import iteritems, itervalues, range
from .utils.compat import iteritems, itervalues, range, get_string

log = logging.getLogger(__name__)

Expand All @@ -54,9 +55,11 @@ def values(self):
return [self[key] for key in self]

def __getitem__(self, key):
if not isinstance(key, bytes):
raise TypeError("TopicDict.__getitem__ accepts a bytes object, but it "
"got '%s'", type(key))
try:
key = get_string(key).encode('ascii')
except UnicodeEncodeError:
raise UnicodeException("Topic name '{}' contains non-ascii "
"characters".format(key))
if self._should_exclude_topic(key):
raise KeyError("You have configured KafkaClient/Cluster to hide "
"double-underscored, internal topics")
Expand Down
5 changes: 5 additions & 0 deletions pykafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class KafkaException(Exception):
pass


class UnicodeException(Exception):
"""Indicates that an error was encountered while processing a unicode string"""
pass


class NoBrokersAvailableError(KafkaException):
"""Indicates that no brokers were available to the cluster's metadata update attempts
"""
Expand Down
15 changes: 9 additions & 6 deletions pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
from .balancedconsumer import BalancedConsumer
from .common import OffsetType
from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup,
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress)
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress,
UnicodeException)
from .membershipprotocol import RangeProtocol
from .protocol import MemberAssignment
from .utils.compat import iterkeys
from .utils.compat import iterkeys, get_string
from .utils.error_handlers import valid_int

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(self,
should join. Consumer group names are namespaced at the cluster level,
meaning that two consumers consuming different topics with the same group name
will be treated as part of the same group.
:type consumer_group: bytes
:type consumer_group: str
:param fetch_message_max_bytes: The number of bytes of messages to
attempt to fetch with each fetch request
:type fetch_message_max_bytes: int
Expand Down Expand Up @@ -170,9 +171,11 @@ def __init__(self,
"""

self._cluster = cluster
if not isinstance(consumer_group, bytes):
raise TypeError("consumer_group must be a bytes object")
self._consumer_group = consumer_group
try:
self._consumer_group = get_string(consumer_group).encode('ascii')
except UnicodeEncodeError:
raise UnicodeException("Consumer group name '{}' contains non-ascii "
"characters".format(consumer_group))
self._topic = topic

self._auto_commit_enable = auto_commit_enable
Expand Down
14 changes: 9 additions & 5 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
ConsumerStoppedException, KafkaException,
NotLeaderForPartition, OffsetRequestFailedError,
RequestTimedOut, UnknownMemberId, RebalanceInProgress,
IllegalGeneration, ERROR_CODES)
IllegalGeneration, ERROR_CODES, UnicodeException)
from .protocol import (PartitionFetchRequest, PartitionOffsetCommitRequest,
PartitionOffsetFetchRequest, PartitionOffsetRequest)
from .utils.error_handlers import (handle_partition_responses, raise_error,
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(self,
:type cluster: :class:`pykafka.cluster.Cluster`
:param consumer_group: The name of the consumer group this consumer
should use for offset committing and fetching.
:type consumer_group: bytes
:type consumer_group: str
:param partitions: Existing partitions to which to connect
:type partitions: Iterable of :class:`pykafka.partition.Partition`
:param fetch_message_max_bytes: The number of bytes of messages to
Expand Down Expand Up @@ -158,9 +158,13 @@ def __init__(self,
"""
self._running = False
self._cluster = cluster
if not (isinstance(consumer_group, bytes) or consumer_group is None):
raise TypeError("consumer_group must be a bytes object")
self._consumer_group = consumer_group
self._consumer_group = None
if consumer_group:
try:
self._consumer_group = get_string(consumer_group).encode('ascii')
except UnicodeEncodeError:
raise UnicodeException("Consumer group name '{}' contains non-ascii "
"characters".format(consumer_group))
self._topic = topic
self._fetch_message_max_bytes = valid_int(fetch_message_max_bytes)
self._fetch_min_bytes = valid_int(fetch_min_bytes)
Expand Down
11 changes: 7 additions & 4 deletions tests/pykafka/test_balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def setUpClass(cls):
cls._mock_consumer, _ = TestBalancedConsumer.buildMockConsumer(timeout=cls._consumer_timeout)

@classmethod
def buildMockConsumer(self, num_partitions=10, num_participants=1, timeout=2000):
consumer_group = b'testgroup'
def buildMockConsumer(self, consumer_group=b'testgroup', num_partitions=10,
num_participants=1, timeout=2000):
topic = mock.Mock()
topic.name = 'testtopic'
topic.partitions = {}
Expand All @@ -59,6 +59,9 @@ def buildMockConsumer(self, num_partitions=10, num_participants=1, timeout=2000)
zookeeper=zk, auto_start=False, use_rdkafka=False,
consumer_timeout_ms=timeout), topic

def test_unicode_consumer_group(self):
consumer, _ = self.buildMockConsumer(consumer_group=u'testgroup')

def test_consume_returns(self):
"""Ensure that consume() returns in the amount of time it's supposed to
"""
Expand Down Expand Up @@ -117,8 +120,8 @@ def test_decide_partitions_roundrobin(self):

class TestManagedBalancedConsumer(TestBalancedConsumer):
@classmethod
def buildMockConsumer(self, num_partitions=10, num_participants=1, timeout=2000):
consumer_group = b'testgroup'
def buildMockConsumer(self, consumer_group=b'testgroup', num_partitions=10,
num_participants=1, timeout=2000):
topic = mock.Mock()
topic.name = 'testtopic'
topic.partitions = {}
Expand Down

0 comments on commit 4bec6cf

Please sign in to comment.