Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from_delayed generated one task group per Delayed object #1077

Closed
hendrikmakait opened this issue Jun 6, 2024 · 1 comment · Fixed by #1084
Closed

from_delayed generated one task group per Delayed object #1077

hendrikmakait opened this issue Jun 6, 2024 · 1 comment · Fixed by #1084
Assignees

Comments

@hendrikmakait
Copy link
Member

Problem

When using from_delayed, we generate a single expression per Delayed object. This then generated a compound key of the Delayed object's key and a hard-coded 0: (self.obj.key, 0)

Due to this compound key, distributed picks the Delayeds key as the task group, which generates a single task group per Delayed object.

import dask.dataframe as dd
import pandas as pd
import pprint

from dask import delayed
from distributed import Client


def foo(x):
    return pd.DataFrame({"x": [x]})

delayeds = [delayed(foo)(i) for i in range(10)]
df = dd.from_delayed(delayeds)

with Client(set_as_default=True) as client:
    result = df.compute()
    pprint.pprint(client.run_on_scheduler(lambda dask_scheduler: dask_scheduler.task_groups))
{'foo-02586e6a-ec9e-42cb-9eb6-46222420147f': <foo-02586e6a-ec9e-42cb-9eb6-46222420147f: released: 1>,
 'foo-0df08452-681d-4958-a8a2-f015b96ef065': <foo-0df08452-681d-4958-a8a2-f015b96ef065: released: 1>,
 'foo-181ac224-3232-468e-93b0-4a61cfbd940e': <foo-181ac224-3232-468e-93b0-4a61cfbd940e: released: 1>,
 'foo-27c00c23-3c74-49cf-8b4c-689677adff35': <foo-27c00c23-3c74-49cf-8b4c-689677adff35: released: 1>,
 'foo-44c676d9-7645-44c9-b43b-027faa4df51e': <foo-44c676d9-7645-44c9-b43b-027faa4df51e: released: 1>,
 'foo-7f820419-d973-452a-be1b-70cffef7c28a': <foo-7f820419-d973-452a-be1b-70cffef7c28a: released: 1>,
 'foo-972e911b-3a7c-4fbf-9d07-af00a0d4dc67': <foo-972e911b-3a7c-4fbf-9d07-af00a0d4dc67: released: 1>,
 'foo-bdaa64d5-28d2-4e48-a910-69207f5d2868': <foo-bdaa64d5-28d2-4e48-a910-69207f5d2868: released: 1>,
 'foo-c74a28ac-a96d-4e89-9009-a72d0bde8f9e': <foo-c74a28ac-a96d-4e89-9009-a72d0bde8f9e: released: 1>,
 'foo-dfb34fc8-f091-4a0f-8ec0-689d5c07ba0e': <foo-dfb34fc8-f091-4a0f-8ec0-689d5c07ba0e: released: 1>,
 'fromdelayed-7fdf8ad58b0d245e0432e9f55c3db378': <fromdelayed-7fdf8ad58b0d245e0432e9f55c3db378: released: 10>,
 'repartitiontofewer-cca5739f4d942ca896245ccc9060e0d3': <repartitiontofewer-cca5739f4d942ca896245ccc9060e0d3: memory: 1>}

Impact

As described in dask/distributed#8677, this can overwhelm the scheduler of a distributed cluster.

@fjetter
Copy link
Member

fjetter commented Jun 10, 2024

With dask/distributed#8678 I hope that the performance implications of this problem is resolved. UX is still awkward and we should look into it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants