Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial DaskRunner for Beam #22421

Merged
merged 69 commits into from Oct 25, 2022
Merged

Initial DaskRunner for Beam #22421

merged 69 commits into from Oct 25, 2022

Conversation

alxmrs
Copy link
Contributor

@alxmrs alxmrs commented Jul 22, 2022

Here, I've created a minimum viable Apache Beam runner for Dask. My approach is to visit a Beam Pipeline and translate PCollections into Dask Bags.

In this version, I have supported enough operations to make test pipeline asserts work. The test themselves are not comprehensive. Further, there are many Bag operations that could be translated for greater efficiency.

CC: @pabloem

Fixes: #18962

Original PR discussion can be found here: alxmrs#1


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Sep 5, 2022

Codecov Report

Merging #22421 (f9cf45a) into master (107a43d) will decrease coverage by 0.14%.
The diff coverage is 82.04%.

@@            Coverage Diff             @@
##           master   #22421      +/-   ##
==========================================
- Coverage   73.35%   73.21%   -0.15%     
==========================================
  Files         719      728       +9     
  Lines       95800    96272     +472     
==========================================
+ Hits        70276    70482     +206     
- Misses      24212    24479     +267     
+ Partials     1312     1311       -1     
Flag Coverage Δ
python 82.80% <86.85%> (-0.25%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/core/runtime/graphx/translate.go 38.42% <0.00%> (ø)
sdks/go/pkg/beam/core/runtime/xlangx/expand.go 0.00% <0.00%> (ø)
sdks/go/pkg/beam/schema.go 35.29% <ø> (ø)
...ython/apache_beam/runners/interactive/sql/utils.py 76.09% <ø> (ø)
sdks/python/apache_beam/transforms/combiners.py 93.43% <ø> (ø)
sdks/python/apache_beam/typehints/row_type.py 100.00% <ø> (ø)
...apache_beam/typehints/native_type_compatibility.py 85.52% <33.33%> (-1.06%) ⬇️
sdks/python/apache_beam/typehints/opcodes.py 85.35% <50.00%> (-0.26%) ⬇️
...dks/python/apache_beam/runners/dask/dask_runner.py 86.45% <86.45%> (ø)
.../python/apache_beam/typehints/trivial_inference.py 96.15% <87.50%> (-0.27%) ⬇️
... and 42 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 21, 2022

@TomAugspurger: I'm having trouble running my unit tests. My tests used to work, but now I'm noticing infinite loops when running them on a local cluster (the default scheduler).

In my last commit, I changed the client.gather command to async mode, and this let me hit a timeout error. Here, it appears that the Beam tests pass (asserts behave as expected within a compute graph), however, the client never stops running and times out after 4 seconds.

Do you have any idea of what's going on? One key difference between my set up now and when I wrote this is that I'm not on a M1 Mac (ARM64). Could this cause my problem?

CC: @rabernat @cisaacstern @pabloem

@TomAugspurger
Copy link

TomAugspurger commented Sep 22, 2022

In my last commit, I changed the client.gather command to async mode, and this let me hit a timeout error

The

self.client.gather(self.futures, errors='raise', asynchronous=True)

looks incorrect inside of a regular def function. That would typically need to be await self.client.gather inside of an async function, since asynchronous=True makes that return a coroutine that needs to be awaited.

Can you expand on the desire for asynchronous=True there? The timeout wasn't working properly without it? FWIW, I don't see the infinite loops locally, even with asynchronous=True.


dask_options = options.view_as(DaskOptions).get_all_options(
drop_default=True)
client = ddist.Client(**dask_options)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Beam typically handle the lifetime of runners? In the tests, I see warnings about re-using port 8787 from Dask, since the client (and cluster) aren't being completely cleaned up between tests.

Is it more common for beam to create (and clean up) the runner? Or would users typically create it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my first runner – @pabloem can probably weigh in better than I can wrt your question. However, what makes sense to me is that each Beam runner should clean up its environment between each run, including in tests.

This probably should happen in the DaskRunnerResult object. Do you have any recommendations on the best way to clean up dask (distributed)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a single scope,

with distributed.Client(...) as client:
    ...

But in this case, as you say, you'll need to call it after the results are done. So I think that something like

client.close()
client.cluster.close()

should do the trick (assuming that beam is the one managing the lifetime of the client.

If you want to rely on the user having a client active, you can call dask.distributed.get_client(), which will raise a ValueError if one hasn't already been created.

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 22, 2022

looks incorrect inside of a regular def function.

Yes – thanks for pointing this out. This makes sense to me, looking further at the documentation.

Can you expand on the desire for asynchronous=True there?

I... really am just trying things to stop hitting an infinite loop. This got me to a timeout error when run in tests. Though, when running e2e in Pangeo-Forge, I definitely experience a runtime error complaining that I wasn't in an async def.

FWIW, I don't see the infinite loops locally, even with asynchronous=True.

Interesting! Do the tests pass for you? What is your environment like? I'm concerned that I'm hitting another architecture issue with ARM.

Thanks for taking a look at this, Tom.

- CoGroupByKey is broken due to how tags are used with GroupByKey
- GroupByKey should output `[('0', None), ('1', 1)]`, however it actually outputs: [(None, ('1', 1)), (None, ('0', None))]
- Once that is fixed, we may have test pipelines work on Dask.
@pabloem
Copy link
Member

pabloem commented Oct 19, 2022

Run Python PreCommit

1 similar comment
@pabloem
Copy link
Member

pabloem commented Oct 19, 2022

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Oct 20, 2022

the only test that is giving trouble should be easy to fix or skip for now. I'll review the PR as is and maybe we'll merge it soon

@alxmrs
Copy link
Contributor Author

alxmrs commented Oct 20, 2022

Thanks Pablo. I think I can easily fix it – I'm having trouble reproducing the issue on my local environment due to my M1 woes.

@pabloem
Copy link
Member

pabloem commented Oct 21, 2022

ok I've taken a look. The code, in fact, looks so clean that I'm very happy to merge.

@pabloem
Copy link
Member

pabloem commented Oct 21, 2022

Run Python PreCommit

2 similar comments
@pabloem
Copy link
Member

pabloem commented Oct 21, 2022

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Oct 21, 2022

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Oct 24, 2022

ugggg haha can't get a passing precommit even though the tests are unrelated.

@pabloem
Copy link
Member

pabloem commented Oct 24, 2022

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Oct 24, 2022

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Oct 24, 2022

sorry about the crazy flakiness. Something is going on recently with our precommits...

@pabloem
Copy link
Member

pabloem commented Oct 25, 2022

ugggg incredibly enough, this issue reproduces only very occasionally in my environment.

@pabloem
Copy link
Member

pabloem commented Oct 25, 2022

Run Python PreCommit

@pabloem
Copy link
Member

pabloem commented Oct 25, 2022

given no changes anywhere close to the current flaky tests, I will merge.

@pabloem
Copy link
Member

pabloem commented Oct 25, 2022

LGTM

@pabloem pabloem merged commit 76761db into apache:master Oct 25, 2022
@alxmrs
Copy link
Contributor Author

alxmrs commented Oct 25, 2022

Wohoo!

@Abacn
Copy link
Contributor

Abacn commented Oct 25, 2022

Thanks. Python Precommit showing following test failure: https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/6286/

apache_beam.runners.dask.dask_runner_test.DaskOptionsTest.test_parser_destinations__agree_with_dask_client

AssertionError: 'vpt_vp_arg3' not found in ['address', 'loop', 'timeout', 'set_as_default', 'scheduler_file', 'security', 'asynchronous', 'name', 'heartbeat_interval', 'serializers', 'deserializers', 'extensions', 'direct_to_workers', 'connection_limit', 'kwargs']

apache_beam.runners.dask.dask_runner_test.DaskRunnerRunPipelineTest.test_create

TypeError: __init__() got an unexpected keyword argument 'vpt_vp_arg3'

@pabloem
Copy link
Member

pabloem commented Oct 26, 2022

thanks Yi for pointing this out

ruslan-ikhsan pushed a commit to akvelon/beam that referenced this pull request Nov 11, 2022
* WIP: Created a skeleton dask runner implementation.

* WIP: Idea for a translation evaluator.

* Added overrides and a visitor that translates operations.

* Fixed a dataclass typo.

* Expanded translations.

* Core idea seems to be kinda working...

* First iteration on DaskRunnerResult (keep track of pipeline state).

* Added minimal set of DaskRunner options.

* WIP: Alllmost got asserts to work! The current status is:
- CoGroupByKey is broken due to how tags are used with GroupByKey
- GroupByKey should output `[('0', None), ('1', 1)]`, however it actually outputs: [(None, ('1', 1)), (None, ('0', None))]
- Once that is fixed, we may have test pipelines work on Dask.

* With a great 1-liner from @pabloem, groupby is fixed! Now, all three initial tests pass.

* Self-review: Cleaned up dask runner impl.

* Self-review: Remove TODOs, delete commented out code, other cleanup.

* First pass at linting rules.

* WIP, include dask dependencies + test setup.

* WIP: maybe better dask deps?

* Skip dask tests depending on successful import.

* Fixed setup.py (missing `,`).

* Added an additional comma.

* Moved skipping logic to be above dask import.

* Fix lint issues with dask runner tests.

* Adding destination for client address.

* Changing to async produces a timeout error instead of stuck in infinite loop.

* Close client during `wait_until_finish`; rm async.

* Supporting side-inputs for ParDo.

* Revert "Close client during `wait_until_finish`; rm async."

This reverts commit 09365f6.

* Revert "Changing to async produces a timeout error instead of stuck in infinite loop."

This reverts commit 676d752.

* Adding -dask tox targets onto the gradle build

* wip - added print stmt.

* wip - prove side inputs is set.

* wip - prove side inputs is set in Pardo.

* wip - rm asserts, add print

* wip - adding named inputs...

* Experiments: non-named side inputs + del `None` in named inputs.

* None --> 'None'

* No default side input.

* Pass along args + kwargs.

* Applied yapf to dask sources.

* Dask sources passing pylint.

* Added dask extra to docs gen tox env.

* Applied yapf from tox.

* Include dask in mypy checks.

* Upgrading mypy support to python 3.8 since py37 support is deprecated in dask.

* Manually installing an old version of dask before 3.7 support was dropped.

* fix lint: line too long.

* Fixed type errors with DaskRunnerResult. Disabled mypy type checking in dask.

* Fix pytype errors (in transform_evaluator).

* Ran isort.

* Ran yapf again.

* Fix imports (one per line)

* isort -- alphabetical.

* Added feature to CHANGES.md.

* ran yapf via tox on linux machine

* Change an import to pass CI.

* Skip isort error; needed to get CI to pass.

* Skip test logic may favor better with isort.

* (Maybe) the last isort fix.

* Tested pipeline options (added one fix).

* Improve formatting of test.

* Self-review: removing side inputs.

In addition, adding a more helpful property to the base DaskBagOp (tranform).

* add dask to coverage suite in tox.

* Capture value error in assert.

* Change timeout value to 600 seconds.

* ignoring broken test

* Update CHANGES.md

* Using reflection to test the Dask client constructor.

* Better method of inspecting the constructor parameters (thanks @TomAugspurger!).

Co-authored-by: Pablo E <pabloem@apache.org>
Co-authored-by: Pablo <pabloem@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for a Dask runner
4 participants