Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
math.floor to remove fractional portion. Otherwise, remainder calcula…
Browse files Browse the repository at this point in the history
…tion is wrong.
  • Loading branch information
kbourgoin committed Apr 26, 2015
1 parent 2c31db1 commit 20d68b2
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
__all__ = ["BalancedConsumer"]
import itertools
import logging as log
import math
import socket
import time
import weakref
Expand Down Expand Up @@ -246,32 +247,28 @@ def _decide_partitions(self, participants):
"""
# Freeze and sort partitions so we always have the same results
p_to_str = lambda p: '-'.join([p.topic.name, str(p.leader.id), str(p.id)])
all_partitions = self._topic.partitions.values()
all_partitions.sort(key=p_to_str)
all_parts = self._topic.partitions.values()
all_parts.sort(key=p_to_str)

# get start point, # of partitions, and remainder
participants.sort() # just make sure it's sorted.
idx = participants.index(self._consumer_id)
parts_per_consumer = len(all_partitions) / len(participants)
remainder_ppc = len(all_partitions) % len(participants)
parts_per_consumer = math.floor(len(all_parts) / len(participants))
remainder_ppc = len(all_parts) % len(participants)

start = parts_per_consumer * idx + min(idx, remainder_ppc)
num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)

# assign partitions from i*N to (i+1)*N - 1 to consumer Ci
new_partitions = itertools.islice(
all_partitions,
start,
start + num_parts
)
new_partitions = itertools.islice(all_parts, start, start + num_parts)
new_partitions = set(new_partitions)
log.info(
'Balancing %i participants for %i partitions. '
'My Partitions: %s -- Consumers: %s --- All Partitions: %s',
len(participants), len(all_partitions),
len(participants), len(all_parts),
[p_to_str(p) for p in new_partitions],
str(participants),
[p_to_str(p) for p in all_partitions]
[p_to_str(p) for p in all_parts]
)
return new_partitions

Expand Down

0 comments on commit 20d68b2

Please sign in to comment.