Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
fix crashes and minor issues
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Apr 24, 2015
1 parent bf6b6ae commit 3da0569
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pykafka/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import random


def random_partitioner(partitions):
def random_partitioner(partitions, key):
"""Returns a random partition out of all of the available partitions."""
return random.choice(partitions)

Expand Down
2 changes: 1 addition & 1 deletion pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def produce(self, messages):
"""Produce a set of messages.
:param messages: The messages to produce
:type messages: Iterable of :class:`pykafka.common.Message`
:type messages: Iterable of str or (str, str) tuples
"""
# Do partition distribution here. We need to be able to retry producing
# only *some* messages when a leader changes. Therefore, we don't want
Expand Down
11 changes: 4 additions & 7 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
limitations under the License.
"""
import itertools
import functools
import logging as log
import time
import threading
Expand Down Expand Up @@ -136,13 +135,11 @@ def __init__(self,

self._discover_offset_manager()

owned_partition_partial = functools.partial(
OwnedPartition, consumer_group=self._consumer_group)
if partitions:
self._partitions = {owned_partition_partial(p): p
self._partitions = {OwnedPartition(p): p
for p in partitions}
else:
self._partitions = {owned_partition_partial(p): topic.partitions[k]
self._partitions = {OwnedPartition(p): topic.partitions[k]
for k, p in topic.partitions.iteritems()}
self._partitions_by_id = {p.partition.id: p
for p in self._partitions.iterkeys()}
Expand All @@ -158,8 +155,8 @@ def __init__(self,

if self._auto_commit_enable:
self._autocommit_worker_thread = self._setup_autocommit_worker()
# we need to get the most up-to-date offsets before starting consumption
self.fetch_offsets()
# we need to get the most up-to-date offsets before starting consumption
self.fetch_offsets()
self._fetch_workers = self._setup_fetch_workers()

def _build_default_error_handlers(self):
Expand Down

0 comments on commit 3da0569

Please sign in to comment.