-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
worker_state is lost between map calls if the input is too large #46
Comments
I just noticed that if I set the number of worker ( I don't know what's going on behind the scenes, but this seems more like a dirty-workaround. I'd like to be able to process my 128 tasks with a specific number of workers (like |
Let's break this down. You first set the state using You're using
The important thing to notice here, however, is that each worker got at most 1 chunk and these chunks are in order. Worker 0 got chunk 0, worker 1 got chunk 1, worker 2 got chunk 2. When you increase In other words, there's no deterministic order in workers and what chunks they receive. And in your example script you're assuming just this. The chunks are delivered to different workers in between your I'm not sure what your actual use case is and what you need to store in the worker state, but here are a few things to do in order to remedy this:
|
@sybrenjansen Hey thanks for the super detailed answer ! That's very helpful. Solution 1 will not be possible for my use-case : I use Solution 2 seems nice ! I'll try and let you know. Solution 3 is of course awesome, a cool new feature ! |
Updated example, working perfectly with from mpire import WorkerPool
from multiprocessing import Barrier
N = 1024
W = 4
B = Barrier(W)
def set_state(w_state, i):
w_state[i] = 2 * i + 1
B.wait()
return None
def get_state(w_state, i):
B.wait()
return w_state[i]
if __name__ == "__main__":
pool = WorkerPool(n_jobs=W, use_worker_state=True, keep_alive=True)
pool.map(set_state, list(range(N)), iterable_len=N, n_splits=W)
r = pool.map(get_state, list(range(N)), iterable_len=N, n_splits=W)
print(r) Thanks again @sybrenjansen for the super clear explanation 🙏 |
Actually, let me re-open this issue...
If the number of tasks is not "round" (try the above script with |
Ok, so I created a working example, by using 2 different barriers : One for the general case, and one for the last indexes (which might not be round). (Note, I tried to use Semaphore, but it's not an appropriate use case so it didn't work) I wrapped everything in a new class so it's easier to use : from mpire import WorkerPool
from mpire.utils import chunk_tasks
from multiprocessing import Barrier
N = 1025
W = 4
class AdaptativeBarrier:
def __init__(self, n_workers, n_tasks):
self.general_barrier = Barrier(n_workers)
self.last_barrier = Barrier(n_tasks % n_workers)
# Compute the indexes of the last pass
chunks = chunk_tasks(range(n_tasks), n_splits=n_workers)
n = n_tasks // n_workers
self.last_idx = [c[-1] for c in chunks if len(c) > n]
def wait(self, i):
if i in self.last_idx:
return self.last_barrier.wait()
else:
return self.general_barrier.wait()
B = AdaptativeBarrier(W, N)
def set_state(w_state, i):
w_state[i] = 2 * i + 1
B.wait(i)
return None
def get_state(w_state, i):
B.wait(i)
return w_state[i]
if __name__ == "__main__":
pool = WorkerPool(n_jobs=W, use_worker_state=True, keep_alive=True)
pool.map(set_state, list(range(N)), iterable_len=N, n_splits=W)
r = pool.map(get_state, list(range(N)), iterable_len=N, n_splits=W)
print(r) I'll keep this issue open, as I think having an additional flag in But feel free to close @sybrenjansen ! |
Let's keep it open. Like I said, it's not that much work and I do think it can be a useful feature for others as well. I think I can find some free time next week or so. |
Had a bit less free time last few weeks than I thought. This is just a reminder that I didn't forgot about it, it's still on my to-do list |
Now available in v2.5.0 |
This is very related to #15.
Since your awesome release
v2.3.0
(which fix #15), I've been usingmpire
a lot, I love it :)But I'm having a problem, very similar to #15.
In the following script, each worker get to deal with several numbers
i
, which they keep in state. Then I retrieve these values in another call.If I run this script, everything works perfectly, I get my expected output :
Now, if I change
N
(the number of tasks) to a higher number (like128
) and I run the script again, I get the following error :It's the exact same error as in #15, so it seems the worker state is somehow erased ?
@sybrenjansen Do you have any idea what's the problem ? Did I do something wrong in my script ?
The text was updated successfully, but these errors were encountered: