Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong partitions distribution logic for MP Consumer #335

Merged
merged 3 commits into from
Mar 12, 2015
Merged

Wrong partitions distribution logic for MP Consumer #335

merged 3 commits into from
Mar 12, 2015

Conversation

vshlapakov
Copy link
Contributor

I could be wrong, but it seems that there's wrong partitions distribution logic for MultiProcessConsumer.
It's not even at all currently, it can be checked with the following function (cut from the code and wrapped):

>>> def get_chunks(partitions, num_procs, partitions_per_proc=0):
...     if not partitions_per_proc:
...             partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
...             if partitions_per_proc < num_procs * 0.5:
...                     partitions_per_proc += 1
...     chunker = lambda *x: [] + list(x)
...     return map(chunker, *[iter(partitions)] * int(partitions_per_proc))
...

Wrong cases examples:

>>> get_chunks(range(16), 3)
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, None, None, None, None]]
# there should be 3 chunks!
>>> get_chunks(range(16), 8)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11], [12, 13, 14], [15, None, None]]
# there should be 8 chunks by 2 partitions each

This PR solves this issue by fixing the condition when we should increase partitions_per_proc by 1:

>>> get_chunks(range(16), 1)
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]]
>>> get_chunks(range(16), 2)
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]]
>>> get_chunks(range(16), 3)
[[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11], [12, 13, 14, 15, None, None]]
>>> get_chunks(range(16), 4)
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
>>> get_chunks(range(16), 8)
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15]]

@dpkp dpkp added the consumer label Mar 8, 2015
@dpkp
Copy link
Owner

dpkp commented Mar 8, 2015

hmm, hadn't looked at this code before. wouldn't a list comprehension like this be a lot cleaner:

[partitions[proc::num_procs] for proc in range(num_procs)]

this stripes partitions via step-wise iteration, rather than chunking sequentially. But it seems to work a lot better. Try get_chunks(range(16), 7) for example. The current approach yields

In []: get_chunks(range(16), 7)
Out[]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11], [12, 13, 14], [15, None, None]]

which is only 6 chunks, with the last having only 1 partition. So really it's 5 processes doing most of the work. Compare with step-wise list comprehension:

In []: partitions = range(16)
In []: num_procs=7
In []: [partitions[proc::num_procs] for proc in range(num_procs)]
Out[]: [[0, 7, 14], [1, 8, 15], [2, 9], [3, 10], [4, 11], [5, 12], [6, 13]]

Now we get 7 chunks, and the partition distribution more even.

@vshlapakov
Copy link
Contributor Author

Great idea, I used it to improve the solution.

@dpkp
Copy link
Owner

dpkp commented Mar 11, 2015

this is only run during init, so the extra cost of dict.copy().keys() is probably worth it. Can you change and add a comment w/ link to http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 ?

@vshlapakov
Copy link
Contributor Author

Done!

dpkp added a commit that referenced this pull request Mar 12, 2015
Wrong partitions distribution logic for MP Consumer
@dpkp dpkp merged commit a5b1c8d into dpkp:master Mar 12, 2015
@dpkp
Copy link
Owner

dpkp commented Mar 12, 2015

thanks!

@vshlapakov vshlapakov deleted the fix-mp-consumer-distribution branch March 24, 2015 17:11
@vshlapakov
Copy link
Contributor Author

Thank you :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants