Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processpool #515

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft

Processpool #515

wants to merge 3 commits into from

Conversation

xl0
Copy link

@xl0 xl0 commented Feb 14, 2023

No description provided.

@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@jph00
Copy link
Member

jph00 commented Feb 15, 2023

Looking good. Can you please add a bit of documentation to this explaining the background behind what it solves and why (with links to background info), because it's a bit non-obvious!

@xl0
Copy link
Author

xl0 commented Feb 15, 2023

Will do, thank you.

@xl0
Copy link
Author

xl0 commented Feb 15, 2023

Ok, parallel is getting out of hand, now with half a dozen interconnected parameters that all affect its behaviour in significant and non-straight-forward ways:

  • n_workers - Has a side-effect of using pool.map() in a worker process/thread, or just map() in the current process.
  • threadpool - Use threads or processes.
    • Threads in Python are subject to GIL, so only good for IO tasks, but they can be started much faster and you don't need to serialize the data sent between them.
  • method - Affects the start method for worker processed. Don't use it with threads. Platform-dependent, and unclear when to change it.
  • executor - Use ProcessPoolExecutor or ProcessPool. Does not work when using threads.
  • maxtasksperchild - If using ProcessPool, limit the lifetime of workers.

Besides that, we also provide overloaded ProcessPool, ProcessPoolExecutor and ThreadPoolExecutor. They act like the classes from concurrent.futures and multiprocessing.pool.Pool, but their .map methods will revert to plain map( ) if you start them with max_workers=0. Other methods ignore this suggestion and start 1 worker process/thread. ProcessPool also accepts daemonic=, which controls if worker processes should be daemonic or not.

Speaking of daemonic, what is it all about? For our purposes, just 2 things seem relevant.

  • A daemonic process will terminate when the parent process (python interpreter) is terminated. A non-demonic process will persist.
  • A daemonic process can not have children, which means you can't use things like Pool or PoolExecutor.
    Processes created by ProcessPoolExecutor are not daemonic (as of 3.8?), while ProcessPool lets you control this property.

I don't think it would a controversial thing to say that parallel() is becoming too complicated. I also don't like that most parameter combinations are invalid. Let me try to simplify it.

First, what is the reason we need 4 different types of pools?

As long as we only use the .map method, there is not much difference between a Pool and a PoolExecutor:

  • ProcessPoolExecutor supports timeout= for .map( ). Workers are always non-daemonic.
  • ProcessPool supports maxtasksperchild and workers can be daemonic or non-daemonic. Has a lot more methods besides .map(), but we don't use them, and they will ignore max_workers=0.
  • ThreadPoolExecutor supports timeout= for .map( ), uses threads.
  • ThreadPool - Does not support timeout or maxtastaperchild, but has all the methods provided by ProcessPool. Not implemented.

In reality, it looks like most of the features are not being used. At least in fastai code:

  • ProcessPoolExecutor and ThreadPoolExecutor are never used directly, only through parallel().
  • timeout= is never used (nb 09b passes timeout= to f(), not to .map())
  • threadpool is used exactly once, again, in nb 09b.

I'd like to propose:

  • Drop custom ProcessPoolExecutor and ProcessPoolExecutor, as neither are being used by external code.
    In parallel( ) do:
if max_workers:
    with pool(...) as pl: r = pl.map(...)`
else: r = map(...)
  • Drop threadpool=. Threads work differently and have vastly different semantics when it comes to synchronization, and this option is only used once, and in a context where a process pool would work just as well.
  • Since we never use timeout, drop it and always use ProcessPool instead of ProceePoolExecutor.
  • Since in the past all processes have been no-daemonic, keep them non-daemonic only when using ProcessPool.
  • Realistically, the only 2 values for maxtasksperchild are 1 and unlimited, replace it with an easier-to-understand parameter, like reuse_workers=False.
  • method is extremely finicky. Maybe we could drop it in favour of heuristics?
  • reuse_workers=True instead of maxtasksperchild

Proposed interface for parallel:

#before:
parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
             method=None, threadpool=False, chunksize=1, maxtasksperchild=None, **kwargs)

#after:
parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
             chunksize=1, reuse_workers=True, **kwargs)

P.S:
I've also noticed that we default to using "fork" on macOS. The documentation says:

Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See https://bugs.python.org/issue?@action=redirect&bpo=33725

At the bottom of the imports cell, we have

try:
    if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
except: pass

It was introduced in a commit that just says "fix import" 65d703f , and it changes the behaviour of all python multiprocessing methods on macOS. Should it be there at all?

@xl0
Copy link
Author

xl0 commented Feb 15, 2023

@jph00 , I had a crack at it. What do you think?

https://github.com/xl0/fastcore/blob/minpool/nbs/03a_parallel.ipynb

All fastai tests worked with --n_workers=1 and default.

@jph00
Copy link
Member

jph00 commented Feb 17, 2023

I've also noticed that we default to using "fork" on macOS

Yes, that's the only way we've found to get DL notebooks to run in practice, despite the concerns that you correctly pointed out.

In your analysis above you seem AFAICT to largely be restricting your impact analysis to usages within the fastcore lib itself. Would you be able to also test the impact of these changes on running the tests for nbdev, fastai, ghapi, and execnb? (Apologies in advance if you've already done this and I missed it.)

@xl0
Copy link
Author

xl0 commented Feb 18, 2023

@jph00 , Thank you for the clarification.

No, I did test with fastcore, nbdev and fastai. I just checked - execnb passes clearly, ghapi needs to drop threadpool=True in nb 03. The change does not perceivable affect performance even if I change the start method to "spawn".

@xl0
Copy link
Author

xl0 commented Feb 18, 2023

@jph00 , one thing I've noticed with ProcessPool.

There is no guarantee as to which worker will start first when using .imap( ). The tasks are scheduled for execution, and the results are returned in the right order, but once the work is scheduled for the newly created workers, any one of them may get to run first. This seems to be not the case for ProcessPoolExecutor - the tasks seem to be always started in the input order, but I'm not sure if it's a guarantee or not.

Do you think this is an issue?
I'm using imap() for the progress bar. .map() just returns the results.

@xl0
Copy link
Author

xl0 commented Feb 18, 2023

Ok, this was actually pretty easy to fix. Instead of wrapping g into _call(), I wrap items into a generator instead:

# |export
def _gen(items, pause):
    for item in items:
        time.sleep(pause)
        yield item

and in parallel()

    with ProcessPool(n_workers, context=get_context(method), reuse_workers=reuse_workers) as ex:
        lock = Manager().Lock()
        _g = partial(_call, lock, pause, n_workers, g)
        r = ex.imap(_g, items, chunksize=chunksize)

becomes

    with ProcessPool(n_workers, context=get_context(method), reuse_workers=reuse_workers) as ex:
        _items = _gen(items, pause)
        r = ex.imap(g, _items, chunksize=chunksize)

What changes is, we pause in the parent process before handing work to the worker, and not in the worker before starting the work. Which means, we no longer even need the lock.

@xl0
Copy link
Author

xl0 commented Feb 19, 2023

@jph00 , please disregard the previous comment, I was wrong. Would it be an issue that the tasks can start out of order?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants