Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ dist/
.*.swp
.*.un~
*.bak
.coverage
coverage.xml
*#*#*
20 changes: 11 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
language: python
sudo: false
python:
- "2.7"

addons:
apt:
packages:
- libev-dev
- libsnappy-dev
env:
- BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin
- TOXENV=py27 BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin
- TOXENV=py34 BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin
- TOXENV=pypy BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin

notifications:
email:
- keith@parsely.com
- emmett@parsely.com

before_install:
- sudo apt-get install -qq libev-dev libsnappy-dev

install:
- pip install python-snappy kazoo unittest2 mock nose
- pip install python-coveralls coverage nose-cov
- pip install testinstances
- python setup.py develop
- pip install python-coveralls kazoo tox

before_script:
- "python -m pykafka.test.kafka_instance 3 --download-dir /home/travis/kafka-bin &"
- "sleep 10"

script:
- nosetests -v --with-cov --cov pykafka --cover-branches --cov-config .coveragerc --logging-level=DEBUG
- tox

# Calculate coverage on success
after_success:
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include *.txt
16 changes: 8 additions & 8 deletions pykafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from broker import Broker
from simpleconsumer import SimpleConsumer
from cluster import Cluster
from partition import Partition
from producer import Producer
from topic import Topic
from client import KafkaClient
from balancedconsumer import BalancedConsumer
from .broker import Broker
from .simpleconsumer import SimpleConsumer
from .cluster import Cluster
from .partition import Partition
from .producer import Producer
from .topic import Topic
from .client import KafkaClient
from .balancedconsumer import BalancedConsumer

__version__ = '2.0.0'

Expand Down
21 changes: 12 additions & 9 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .exceptions import (KafkaException, PartitionOwnedError,
ConsumerStoppedException)
from .simpleconsumer import SimpleConsumer
from .utils.compat import range, get_bytes


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -301,25 +302,28 @@ def _decide_partitions(self, participants):
:type participants: Iterable of str
"""
# Freeze and sort partitions so we always have the same results
p_to_str = lambda p: '-'.join([p.topic.name, str(p.leader.id), str(p.id)])
p_to_str = lambda p: b'-'.join([
get_bytes(p.topic.name), bytes(p.leader.id), bytes(p.id)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be more readable if the lambda just returns a plain tuple, without any bytes or str coercions.

)
all_parts = self._topic.partitions.values()
all_parts.sort(key=p_to_str)
all_parts = sorted(all_parts, key=p_to_str)

# get start point, # of partitions, and remainder
participants.sort() # just make sure it's sorted.
participants = sorted(participants) # just make sure it's sorted.
idx = participants.index(self._consumer_id)
parts_per_consumer = math.floor(len(all_parts) / len(participants))
remainder_ppc = len(all_parts) % len(participants)

start = parts_per_consumer * idx + min(idx, remainder_ppc)
num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)
start = int(parts_per_consumer * idx + min(idx, remainder_ppc))
num_parts = int(parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1))

# assign partitions from i*N to (i+1)*N - 1 to consumer Ci
new_partitions = itertools.islice(all_parts, start, start + num_parts)
new_partitions = set(new_partitions)
log.info('Balancing %i participants for %i partitions.\nOwning %i partitions.',
len(participants), len(all_parts), len(new_partitions))
log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions])

return new_partitions

def _get_participants(self):
Expand All @@ -343,7 +347,7 @@ def _get_participants(self):
participants.append(id_)
except NoNodeException:
pass # disappeared between ``get_children`` and ``get``
participants.sort()
participants = sorted(participants)
return participants

def _set_watches(self):
Expand Down Expand Up @@ -409,13 +413,12 @@ def _rebalance(self):
self._consumer_id, self._topic.name)
)

for i in xrange(self._rebalance_max_retries):
for i in range(self._rebalance_max_retries):
try:
# If retrying, be sure to make sure the
# partition allocation is correct.
participants = self._get_participants()
partitions = self._decide_partitions(participants)

old_partitions = self._partitions - partitions
self._remove_partitions(old_partitions)

Expand Down Expand Up @@ -469,7 +472,7 @@ def _add_partitions(self, partitions):
try:
self._zookeeper.create(
self._path_from_partition(p),
value=self._consumer_id,
value=get_bytes(self._consumer_id),
ephemeral=True
)
self._partitions.add(p)
Expand Down
8 changes: 4 additions & 4 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse,
ProduceResponse)

from .utils.compat import range, iteritems

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -274,18 +274,18 @@ def request_metadata(self, topics=None):
:type topics: Iterable of int
"""
max_retries = 3
for i in xrange(max_retries):
for i in range(max_retries):
if i > 0:
log.debug("Retrying")
time.sleep(i)

future = self._req_handler.request(MetadataRequest(topics=topics))
response = future.get(MetadataResponse)

for name, topic_metadata in response.topics.iteritems():
for name, topic_metadata in iteritems(response.topics):
if topic_metadata.err == LeaderNotAvailable.ERROR_CODE:
log.warning("Leader not available for topic '%s'.", name)
for pid, partition_metadata in topic_metadata.partitions.iteritems():
for pid, partition_metadata in iteritems(topic_metadata.partitions):
if partition_metadata.err == LeaderNotAvailable.ERROR_CODE:
log.warning("Leader not available for topic '%s' partition %d.",
name, pid)
Expand Down
6 changes: 3 additions & 3 deletions pykafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"""

__all__ = ["KafkaClient"]
import handlers
from .handlers import ThreadingHandler
import logging
from cluster import Cluster
from .cluster import Cluster

try:
import rd_kafka
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(self,
self._source_address = source_address
self._socket_timeout_ms = socket_timeout_ms
self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms
self._handler = None if use_greenlets else handlers.ThreadingHandler()
self._handler = None if use_greenlets else ThreadingHandler()
self._use_rdkafka = rd_kafka and not ignore_rdkafka
if self._use_rdkafka:
log.info('Using rd_kafka extensions.')
Expand Down
23 changes: 14 additions & 9 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
UnknownTopicOrPartition)
from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse
from .topic import Topic

from .utils.compat import iteritems, range, get_bytes

log = logging.getLogger(__name__)

Expand All @@ -43,6 +43,9 @@ def __init__(self, cluster, *args, **kwargs):

def __missing__(self, key):
log.warning('Topic %s not found. Attempting to auto-create.', key)

key = get_bytes(key)

if self._create_topic(key):
return self[key]
else:
Expand All @@ -56,16 +59,18 @@ def _create_topic(self, topic_name):
with settings and everything, we'll implement that. To expose just
this now would be disingenuous, since it's features would be hobbled.
"""
topic_name = get_bytes(topic_name)

if len(self._cluster.brokers) == 0:
log.warning("No brokers found. This is probably because of "
"KAFKA-2154, which will be fixed in Kafka 0.8.3")
raise KafkaException("Unable to retrieve metdata. Can't auto-create topic. See log for details.")
# Auto-creating will take a moment, so we try 5 times.
for i in xrange(5):
for i in range(5):
# Auto-creating is as simple as issuing a metadata request
# solely for that topic. The update is just to be sure
# our `Cluster` knows about it.
self._cluster.brokers[self._cluster.brokers.keys()[0]].request_metadata(topics=[topic_name])
self._cluster.brokers[list(self._cluster.brokers.keys())[0]].request_metadata(topics=[topic_name])
self._cluster.update()
if topic_name in self:
log.info('Topic %s successfully auto-created.', topic_name)
Expand Down Expand Up @@ -155,7 +160,7 @@ def _get_metadata(self):
for broker_str in brokers:
try:
h, p = broker_str.split(':')
broker = Broker(-1, h, p, self._handler,
broker = Broker(-1, h, int(p), self._handler,
self._socket_timeout_ms,
self._offsets_channel_socket_timeout_ms,
buffer_size=1024 * 1024,
Expand Down Expand Up @@ -189,7 +194,7 @@ def _update_brokers(self, broker_metadata):
# Add/update current brokers
if len(broker_metadata) > 0:
log.info('Discovered %d brokers', len(broker_metadata))
for id_, meta in broker_metadata.iteritems():
for id_, meta in iteritems(broker_metadata):
if id_ not in self._brokers:
log.debug('Discovered broker id %s: %s:%s', id_, meta.host, meta.port)
self._brokers[id_] = Broker.from_metadata(
Expand Down Expand Up @@ -225,7 +230,7 @@ def _update_topics(self, metadata):
# Add/update partition information
if len(metadata) > 0:
log.info("Discovered %d topics", len(metadata))
for name, meta in metadata.iteritems():
for name, meta in iteritems(metadata):
if not self._should_exclude_topic(name):
if name not in self._topics:
self._topics[name] = Topic(self, meta)
Expand All @@ -237,7 +242,7 @@ def _should_exclude_topic(self, topic_name):
"""Should this topic be excluded from the list shown to the client?"""
if not self._exclude_internal_topics:
return False
return topic_name.startswith("__")
return topic_name.startswith(b"__")

def get_offset_manager(self, consumer_group):
"""Get the broker designated as the offset manager for this consumer group.
Expand All @@ -251,10 +256,10 @@ def get_offset_manager(self, consumer_group):
log.info("Attempting to discover offset manager for consumer group '%s'",
consumer_group)
# arbitrarily choose a broker, since this request can go to any
broker = self.brokers[random.choice(self.brokers.keys())]
broker = self.brokers[random.choice(list(self.brokers.keys()))]
MAX_RETRIES = 5

for i in xrange(MAX_RETRIES):
for i in range(MAX_RETRIES):
if i > 0:
log.debug("Retrying offset manager discovery")
time.sleep(i * 2)
Expand Down
3 changes: 3 additions & 0 deletions pykafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from .exceptions import SocketDisconnectedError
from .utils.socket import recvall_into
from .utils.compat import buffer

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -101,6 +102,7 @@ def request(self, request):
bytes = request.get_bytes()
if not self._socket:
raise SocketDisconnectedError

self._socket.sendall(bytes)

def response(self):
Expand All @@ -110,6 +112,7 @@ def response(self):
# Happens when broker has shut down
self.disconnect()
raise SocketDisconnectedError

size = struct.unpack('!i', size)[0]
recvall_into(self._socket, self._buff, size)
return buffer(self._buff[4:4 + size])
10 changes: 5 additions & 5 deletions pykafka/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import atexit
import functools
import threading
import Queue

from .utils.compat import Queue, Empty
from collections import namedtuple


Expand Down Expand Up @@ -69,8 +68,8 @@ def spawn(self, target, *args, **kwargs):

class ThreadingHandler(Handler):
"""A handler. that uses a :class:`threading.Thread` to perform its work"""
QueueEmptyError = Queue.Empty
Queue = Queue.Queue
QueueEmptyError = Empty
Queue = Queue
Event = threading.Event
Lock = threading.Lock
# turn off RLock's super annoying default logging
Expand Down Expand Up @@ -130,10 +129,11 @@ def worker():
task = self._requests.get()
try:
self.connection.request(task.request)

if task.future:
res = self.connection.response()
task.future.set_response(res)
except Exception, e:
except Exception as e:
if task.future:
task.future.set_error(e)
finally:
Expand Down
Loading