Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiprocessing timeout #3511

Merged
merged 21 commits into from Oct 16, 2020
Merged

Fix multiprocessing timeout #3511

merged 21 commits into from Oct 16, 2020

Conversation

zanieb
Copy link
Contributor

@zanieb zanieb commented Oct 15, 2020

Summary

The value retrieval in the multiprocessing_timeout was using a locally defined helper function
which could not be serialized by the native Python pickler resulting in an error running tasks with
a timeout while using a process based scheduler.

Example

from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from random import randint

@task
def foo(x):
    return [randint(0, i) for i in range(x)]

@task(timeout=10)
def bar(y):
    return y * 2

with Flow("foobar") as flow:
    y = foo(10)
    z = bar.map(y) 

if __name__ == "__main__":
    state = flow.run(executor=LocalDaskExecutor())

Results in the error AttributeError: Can't pickle local object 'multiprocessing_timeout.<locals>.retrieve_value'

Also fixes a timeout issue with function pickling similar to #2634

Changes

  • Moves the multiprocessing_timeout.retrieve_value function to a top-level function
  • Renames retrieve_value to multiprocessing_safe_retrieve_value
  • multiprocessing_safe_retrieve_value takes a cloudpickle serialized callable so tasks defined in scripts can be passed
  • Adds the mproc and mproc_local executors to the flow timeout test
  • Minor documentation improvements in the multiprocessing_timeout function
  • Adds a __processes attribute to distributed test executors
  • Adds a local multiprocess test executor
  • Fixes the synchronous test executor to use the correct scheduler
  • Adds test coverage for daemon process / soft limit timeout enforcement

Importance

  • Fixes a bug in multiprocessing timeouts
  • Improves user experience when defining timeout tasks in scripts

Checklist

This PR:

  • adds new tests (if appropriate)
  • adds a change file in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

This was breaking because the locally defined helper function
was not serializable by the Python pickler. This change also
serializes the given function using `cloudpickle` to fix
passing tasks that are defined in a script rather than a
module.
Previously process scheduled execution timeouts were untested
Previously was using the threaded scheduler
perhaps because a default option changed
in the past
For testing timeouts in processes
The only way I can figure out to make this
bool visible to tests
Asserts
- The warning was given for daemon executors
- "invalid" is not written except for daemon executors

Additionally bumps the sleep by a second because
tests were failing locally for me, moves the sleep
into a variable because it's used twice and if not
changed in both locations the test will be invalid
tests/conftest.py Outdated Show resolved Hide resolved
@zanieb
Copy link
Contributor Author

zanieb commented Oct 15, 2020

The referenced pickling error does not appear to occur on Linux just OSX 🤷

@jcrist
Copy link

jcrist commented Oct 15, 2020

Nice find, I'll give this a longer review after lunch.

The referenced pickling error does not appear to occur on Linux just OSX 🤷

In python 3.8 osx switched to using spawn by default, while linux still uses fork (where pickling won't be needed).

tests/core/test_flow.py Outdated Show resolved Hide resolved
src/prefect/utilities/executors.py Outdated Show resolved Hide resolved
tests/core/test_flow.py Outdated Show resolved Hide resolved
tests/conftest.py Outdated Show resolved Hide resolved
zanieb and others added 9 commits October 15, 2020 12:49
Allows us to skip setting an attribute

Co-authored-by: Jim Crist-Harif <jcrist@users.noreply.github.com>
Was failing locally with no file written yet
Otherwise 'invalid' was not being consistently
written to the file which would give false-passes
for this test
@codecov
Copy link

codecov bot commented Oct 15, 2020

Codecov Report

Merging #3511 into master will decrease coverage by 0.07%.
The diff coverage is 45.83%.

Copy link

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One final tweak and this looks good to me.

src/prefect/utilities/executors.py Outdated Show resolved Hide resolved
src/prefect/utilities/executors.py Show resolved Hide resolved
To be consistent with passed callables elsewhere
@zanieb zanieb requested a review from jcrist October 16, 2020 16:22
@jcrist jcrist merged commit 49d058e into master Oct 16, 2020
@jcrist jcrist deleted the fix-multiprocessing-timeout branch October 16, 2020 16:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants