Skip to content

Use _deprecated_kwarg and test for deprecation warnings

Sign in for the full log view
GitHub Actions / Unit Test Results failed Jan 15, 2024 in 0s

498 fail, 929 skipped, 11 493 pass in 3h 39m 43s

     15 files  ±  0       15 suites  ±0   3h 39m 43s ⏱️ + 5m 27s
 12 920 tests + 34   11 493 ✅ +  1     929 💤 ±0  498 ❌ +33 
160 077 runs  +408  142 694 ✅ +376  16 885 💤  - 1  498 ❌ +33 

Results for commit 965dab5. ± Comparison against earlier commit 8f6b61d.

Annotations

Check warning on line 0 in dask.bag.tests.test_bag

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_to_dataframe (dask.bag.tests.test_bag)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 1s]
Raw output
TypeError: FrameBase.__init__() takes 2 positional arguments but 5 were given
def test_to_dataframe():
        dd = pytest.importorskip("dask.dataframe")
        pd = pytest.importorskip("pandas")
    
        def check_parts(df, sol):
            assert all(
                (p.dtypes == sol.dtypes).all() for p in dask.compute(*df.to_delayed())
            )
    
        dsk = {("test", 0): [(1, 2)], ("test", 1): [], ("test", 2): [(10, 20), (100, 200)]}
        b = Bag(dsk, "test", 3)
        sol = pd.DataFrame(b.compute(), columns=["a", "b"])
    
        # Elements are tuples
>       df = b.to_dataframe()

dask/bag/tests/test_bag.py:858: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = dask.bag<test, npartitions=3>, meta =    0  1
0  1  2, columns = None
optimize_graph = True

    def to_dataframe(self, meta=None, columns=None, optimize_graph=True):
        """Create Dask Dataframe from a Dask Bag.
    
        Bag should contain tuples, dict records, or scalars.
    
        Index will not be particularly meaningful.  Use ``reindex`` afterwards
        if necessary.
    
        Parameters
        ----------
        meta : pd.DataFrame, dict, iterable, optional
            An empty ``pd.DataFrame`` that matches the dtypes and column names
            of the output. This metadata is necessary for many algorithms in
            dask dataframe to work.  For ease of use, some alternative inputs
            are also available. Instead of a ``DataFrame``, a ``dict`` of
            ``{name: dtype}`` or iterable of ``(name, dtype)`` can be provided.
            If not provided or a list, a single element from the first
            partition will be computed, triggering a potentially expensive call
            to ``compute``. This may lead to unexpected results, so providing
            ``meta`` is recommended. For more information, see
            ``dask.dataframe.utils.make_meta``.
        columns : sequence, optional
            Column names to use. If the passed data do not have names
            associated with them, this argument provides names for the columns.
            Otherwise this argument indicates the order of the columns in the
            result (any names not found in the data will become all-NA
            columns).  Note that if ``meta`` is provided, column names will be
            taken from there and this parameter is invalid.
        optimize_graph : bool, optional
            If True [default], the graph is optimized before converting into
            :class:`dask.dataframe.DataFrame`.
    
    
        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
        ...                       {'name': 'Bob',     'balance': 200},
        ...                       {'name': 'Charlie', 'balance': 300}],
        ...                      npartitions=2)
        >>> df = b.to_dataframe()
    
        >>> df.compute()
              name  balance
        0    Alice      100
        1      Bob      200
        0  Charlie      300
        """
        import pandas as pd
    
        import dask.dataframe as dd
    
        if meta is None:
            head = self.take(1, warn=False)
            if len(head) == 0:
                raise ValueError(
                    "`dask.bag.Bag.to_dataframe` failed to "
                    "properly infer metadata, please pass in "
                    "metadata via the `meta` keyword"
                )
            meta = pd.DataFrame(list(head), columns=columns)
        elif columns is not None:
            raise ValueError("Can't specify both `meta` and `columns`")
        else:
            meta = dd.utils.make_meta(meta, parent_meta=pd.DataFrame())
        # Serializing the columns and dtypes is much smaller than serializing
        # the empty frame
        cols = list(meta.columns)
        dtypes = meta.dtypes.to_dict()
    
        dfs = self.map_partitions(to_dataframe, cols, dtypes)
        if optimize_graph:
            dsk = self.__dask_optimize__(dfs.dask, dfs.__dask_keys__())
        else:
            dsk = dfs.dask
    
        divisions = [None] * (self.npartitions + 1)
>       return dd.DataFrame(dsk, dfs.name, meta, divisions)
E       TypeError: FrameBase.__init__() takes 2 positional arguments but 5 were given

dask/bag/core.py:1624: TypeError

Check warning on line 0 in dask.tests.test_backends

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_CreationDispatch_error_informative_message[pandas] (dask.tests.test_backends)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
assert 'error occurred while calling the from_dict method' in "from_dict() got an unexpected keyword argument 'unsupported_kwarg'"
backend = 'pandas'

    @pytest.mark.gpu
    @pytest.mark.parametrize("backend", ["pandas", "cudf"])
    def test_CreationDispatch_error_informative_message(backend):
        # Check that an informative error is emitted when a backend dispatch
        # method fails
        pytest.importorskip(backend)
        dd = pytest.importorskip("dask.dataframe")
        data = {"a": [1, 2, 3, 4], "B": [10, 11, 12, 13]}
        with dask.config.set({"dataframe.backend": backend}):
            with pytest.raises(TypeError) as excinfo:
                dd.from_dict(data, npartitions=2, unsupported_kwarg=True)
    
            msg = str(excinfo.value)
>           assert "error occurred while calling the from_dict method" in msg
E           assert 'error occurred while calling the from_dict method' in "from_dict() got an unexpected keyword argument 'unsupported_kwarg'"

dask/tests/test_backends.py:21: AssertionError

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_tokenize_function_cloudpickle (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
Failed: DID NOT RAISE <class 'RuntimeError'>
def test_tokenize_function_cloudpickle():
        a, b = (lambda x: x, lambda x: x)
        # No error by default
        tokenize(a)
    
        with dask.config.set({"tokenize.ensure-deterministic": True}):
>           with pytest.raises(RuntimeError, match="may not be deterministically hashed"):
E           Failed: DID NOT RAISE <class 'RuntimeError'>

dask/tests/test_base.py:278: Failed

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_persist_scalar (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AssertionError: assert 6 == 5
 +  where 6 = len({('chunk-022e0ce89358f2223db6e1765d71eff8', 0): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0)], {'skipna': True}), ('chunk-022e0ce89358f2223db6e1765d71eff8', 1): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1)], {'skipna': True}), ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0): 0    1\n1    2\ndtype: int64, ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1): 2    3\n3    4\ndtype: int64, ...})
 +    where {('chunk-022e0ce89358f2223db6e1765d71eff8', 0): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0)], {'skipna': True}), ('chunk-022e0ce89358f2223db6e1765d71eff8', 1): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1)], {'skipna': True}), ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0): 0    1\n1    2\ndtype: int64, ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1): 2    3\n3    4\ndtype: int64, ...} = <bound method FrameBase.__dask_graph__ of <dask_expr.expr.Scalar: expr=df.min()>>()
 +      where <bound method FrameBase.__dask_graph__ of <dask_expr.expr.Scalar: expr=df.min()>> = <dask_expr.expr.Scalar: expr=df.min()>.__dask_graph__
@pytest.mark.skipif("not dd")
    def test_persist_scalar():
        ds = pd.Series([1, 2, 3, 4])
        dds1 = dd.from_pandas(ds, npartitions=2).min()
>       assert len(dds1.__dask_graph__()) == 5
E       AssertionError: assert 6 == 5
E        +  where 6 = len({('chunk-022e0ce89358f2223db6e1765d71eff8', 0): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0)], {'skipna': True}), ('chunk-022e0ce89358f2223db6e1765d71eff8', 1): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1)], {'skipna': True}), ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0): 0    1\n1    2\ndtype: int64, ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1): 2    3\n3    4\ndtype: int64, ...})
E        +    where {('chunk-022e0ce89358f2223db6e1765d71eff8', 0): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0)], {'skipna': True}), ('chunk-022e0ce89358f2223db6e1765d71eff8', 1): (<function apply at 0x7faaf939f880>, <bound method Reduction.chunk of <class 'dask_expr._reductions.Min'>>, [('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1)], {'skipna': True}), ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 0): 0    1\n1    2\ndtype: int64, ('frompandas-fee8d93c939cf46098d3be5a119a4f48', 1): 2    3\n3    4\ndtype: int64, ...} = <bound method FrameBase.__dask_graph__ of <dask_expr.expr.Scalar: expr=df.min()>>()
E        +      where <bound method FrameBase.__dask_graph__ of <dask_expr.expr.Scalar: expr=df.min()>> = <dask_expr.expr.Scalar: expr=df.min()>.__dask_graph__

dask/tests/test_base.py:1064: AssertionError

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_persist_dataframe_rename (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AssertionError: {'rename': {'frompandas-f9bfa577541a8e568c42e0d7472aa35d': 'x'}}
@pytest.mark.skipif("not dd")
    def test_persist_dataframe_rename():
        df1 = pd.DataFrame({"a": [1, 2, 3, 4], "b": [5, 6, 7, 8]})
        df2 = pd.DataFrame({"a": [2, 3, 5, 6], "b": [6, 7, 9, 10]})
        ddf1 = dd.from_pandas(df1, npartitions=2)
        rebuild, args = ddf1.__dask_postpersist__()
        dsk = {("x", 0): df2.iloc[:2], ("x", 1): df2.iloc[2:]}
>       ddf2 = rebuild(dsk, *args, rename={ddf1._name: "x"})

dask/tests/test_base.py:1078: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:2894: in from_graph
    return new_collection(FromGraph(*args, **kwargs))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[RecursionError('maximum recursion depth exceeded') raised in repr()] FromGraph object at 0x7faa44d61eb0>
args = ({('x', 0):    a  b
0  2  6
1  3  7, ('x', 1):    a   b
2  5   9
3  6  10}, Empty DataFrame
Columns: [a, b]
Index: [], (0, 2, 3), 'frompandas-f9bfa577541a8e568c42e0d7472aa35d')
kwargs = {'rename': {'frompandas-f9bfa577541a8e568c42e0d7472aa35d': 'x'}}
operands = [{('x', 0):    a  b
0  2  6
1  3  7, ('x', 1):    a   b
2  5   9
3  6  10}, Empty DataFrame
Columns: [a, b]
Index: [], (0, 2, 3), 'frompandas-f9bfa577541a8e568c42e0d7472aa35d']

    def __init__(self, *args, **kwargs):
        operands = list(args)
        for parameter in type(self)._parameters[len(operands) :]:
            try:
                operands.append(kwargs.pop(parameter))
            except KeyError:
                operands.append(type(self)._defaults[parameter])
>       assert not kwargs, kwargs
E       AssertionError: {'rename': {'frompandas-f9bfa577541a8e568c42e0d7472aa35d': 'x'}}

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:39: AssertionError

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_persist_series_rename (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AssertionError: {'rename': {'frompandas-fee8d93c939cf46098d3be5a119a4f48': 'x'}}
@pytest.mark.skipif("not dd")
    def test_persist_series_rename():
        ds1 = pd.Series([1, 2, 3, 4])
        ds2 = pd.Series([5, 6, 7, 8])
        dds1 = dd.from_pandas(ds1, npartitions=2)
        rebuild, args = dds1.__dask_postpersist__()
        dsk = {("x", 0): ds2.iloc[:2], ("x", 1): ds2.iloc[2:]}
>       dds2 = rebuild(dsk, *args, rename={dds1._name: "x"})

dask/tests/test_base.py:1090: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:2894: in from_graph
    return new_collection(FromGraph(*args, **kwargs))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[RecursionError('maximum recursion depth exceeded') raised in repr()] FromGraph object at 0x7faaa41111c0>
args = ({('x', 0): 0    5
1    6
dtype: int64, ('x', 1): 2    7
3    8
dtype: int64}, Series([], dtype: int64), (0, 2, 3), 'frompandas-fee8d93c939cf46098d3be5a119a4f48')
kwargs = {'rename': {'frompandas-fee8d93c939cf46098d3be5a119a4f48': 'x'}}
operands = [{('x', 0): 0    5
1    6
dtype: int64, ('x', 1): 2    7
3    8
dtype: int64}, Series([], dtype: int64), (0, 2, 3), 'frompandas-fee8d93c939cf46098d3be5a119a4f48']

    def __init__(self, *args, **kwargs):
        operands = list(args)
        for parameter in type(self)._parameters[len(operands) :]:
            try:
                operands.append(kwargs.pop(parameter))
            except KeyError:
                operands.append(type(self)._defaults[parameter])
>       assert not kwargs, kwargs
E       AssertionError: {'rename': {'frompandas-fee8d93c939cf46098d3be5a119a4f48': 'x'}}

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:39: AssertionError

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_persist_scalar_rename (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AssertionError: {'rename': {'min-03d5dd9e6eb865698fdebdbf92a40138': 'x'}}
@pytest.mark.skipif("not dd")
    def test_persist_scalar_rename():
        ds1 = pd.Series([1, 2, 3, 4])
        dds1 = dd.from_pandas(ds1, npartitions=2).min()
        rebuild, args = dds1.__dask_postpersist__()
>       dds2 = rebuild({("x", 0): 5}, *args, rename={dds1._name: "x"})

dask/tests/test_base.py:1100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:2894: in from_graph
    return new_collection(FromGraph(*args, **kwargs))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[RecursionError('maximum recursion depth exceeded') raised in repr()] FromGraph object at 0x7faaa4112ae0>
args = ({('x', 0): 5}, 1, (None, None), 'min-tree-71dcaa6abe60504439f7ba770df9c82a')
kwargs = {'rename': {'min-03d5dd9e6eb865698fdebdbf92a40138': 'x'}}
operands = [{('x', 0): 5}, 1, (None, None), 'min-tree-71dcaa6abe60504439f7ba770df9c82a']

    def __init__(self, *args, **kwargs):
        operands = list(args)
        for parameter in type(self)._parameters[len(operands) :]:
            try:
                operands.append(kwargs.pop(parameter))
            except KeyError:
                operands.append(type(self)._defaults[parameter])
>       assert not kwargs, kwargs
E       AssertionError: {'rename': {'min-03d5dd9e6eb865698fdebdbf92a40138': 'x'}}

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:39: AssertionError

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_emscripten_default_scheduler['dask.dataframe', '_Frame', 'sync', True] (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 3s]
Raw output
subprocess.CalledProcessError: Command '['/home/runner/miniconda3/envs/test-environment/bin/python3.12', '-c', 'def check_default_scheduler(module, collection, expected, emscripten):\n    from contextlib import nullcontext\n    from unittest import mock\n\n    from dask.local import get_sync\n\n    if emscripten:\n        ctx = mock.patch("dask.base.named_schedulers", {"sync": get_sync})\n    else:\n        ctx = nullcontext()\n    with ctx:\n        import importlib\n\n        if expected == "sync":\n            from dask.local import get_sync as get\n        elif expected == "threads":\n            from dask.threaded import get\n        elif expected == "processes":\n            from dask.multiprocessing import get\n\n        mod = importlib.import_module(module)\n\n        assert getattr(mod, collection).__dask_scheduler__ == get\ncheck_default_scheduler(\'dask.dataframe\', \'_Frame\', \'sync\', True)\n']' returned non-zero exit status 1.
params = "'dask.dataframe', '_Frame', 'sync', True"

    @pytest.mark.parametrize(
        "params",
        (
            "'dask.dataframe', '_Frame', 'sync', True",
            "'dask.dataframe', '_Frame', 'threads', False",
            "'dask.array', 'Array', 'sync', True",
            "'dask.array', 'Array', 'threads', False",
            "'dask.bag', 'Bag', 'sync', True",
            "'dask.bag', 'Bag', 'processes', False",
        ),
    )
    def test_emscripten_default_scheduler(params):
        pytest.importorskip("dask.array")
        pytest.importorskip("dask.dataframe")
        proc = subprocess.run(
            [
                sys.executable,
                "-c",
                (
                    inspect.getsource(check_default_scheduler)
                    + f"check_default_scheduler({params})\n"
                ),
            ]
        )
>       proc.check_returncode()

dask/tests/test_base.py:1737: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = CompletedProcess(args=['/home/runner/miniconda3/envs/test-environment/bin/python3.12', '-c', 'def check_default_schedu...).__dask_scheduler__ == get\ncheck_default_scheduler(\'dask.dataframe\', \'_Frame\', \'sync\', True)\n'], returncode=1)

    def check_returncode(self):
        """Raise CalledProcessError if the exit code is non-zero."""
        if self.returncode:
>           raise CalledProcessError(self.returncode, self.args, self.stdout,
                                     self.stderr)
E           subprocess.CalledProcessError: Command '['/home/runner/miniconda3/envs/test-environment/bin/python3.12', '-c', 'def check_default_scheduler(module, collection, expected, emscripten):\n    from contextlib import nullcontext\n    from unittest import mock\n\n    from dask.local import get_sync\n\n    if emscripten:\n        ctx = mock.patch("dask.base.named_schedulers", {"sync": get_sync})\n    else:\n        ctx = nullcontext()\n    with ctx:\n        import importlib\n\n        if expected == "sync":\n            from dask.local import get_sync as get\n        elif expected == "threads":\n            from dask.threaded import get\n        elif expected == "processes":\n            from dask.multiprocessing import get\n\n        mod = importlib.import_module(module)\n\n        assert getattr(mod, collection).__dask_scheduler__ == get\ncheck_default_scheduler(\'dask.dataframe\', \'_Frame\', \'sync\', True)\n']' returned non-zero exit status 1.

../../../miniconda3/envs/test-environment/lib/python3.12/subprocess.py:502: CalledProcessError

Check warning on line 0 in dask.tests.test_base

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_emscripten_default_scheduler['dask.dataframe', '_Frame', 'threads', False] (dask.tests.test_base)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 3s]
Raw output
subprocess.CalledProcessError: Command '['/home/runner/miniconda3/envs/test-environment/bin/python3.12', '-c', 'def check_default_scheduler(module, collection, expected, emscripten):\n    from contextlib import nullcontext\n    from unittest import mock\n\n    from dask.local import get_sync\n\n    if emscripten:\n        ctx = mock.patch("dask.base.named_schedulers", {"sync": get_sync})\n    else:\n        ctx = nullcontext()\n    with ctx:\n        import importlib\n\n        if expected == "sync":\n            from dask.local import get_sync as get\n        elif expected == "threads":\n            from dask.threaded import get\n        elif expected == "processes":\n            from dask.multiprocessing import get\n\n        mod = importlib.import_module(module)\n\n        assert getattr(mod, collection).__dask_scheduler__ == get\ncheck_default_scheduler(\'dask.dataframe\', \'_Frame\', \'threads\', False)\n']' returned non-zero exit status 1.
params = "'dask.dataframe', '_Frame', 'threads', False"

    @pytest.mark.parametrize(
        "params",
        (
            "'dask.dataframe', '_Frame', 'sync', True",
            "'dask.dataframe', '_Frame', 'threads', False",
            "'dask.array', 'Array', 'sync', True",
            "'dask.array', 'Array', 'threads', False",
            "'dask.bag', 'Bag', 'sync', True",
            "'dask.bag', 'Bag', 'processes', False",
        ),
    )
    def test_emscripten_default_scheduler(params):
        pytest.importorskip("dask.array")
        pytest.importorskip("dask.dataframe")
        proc = subprocess.run(
            [
                sys.executable,
                "-c",
                (
                    inspect.getsource(check_default_scheduler)
                    + f"check_default_scheduler({params})\n"
                ),
            ]
        )
>       proc.check_returncode()

dask/tests/test_base.py:1737: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = CompletedProcess(args=['/home/runner/miniconda3/envs/test-environment/bin/python3.12', '-c', 'def check_default_schedu...dask_scheduler__ == get\ncheck_default_scheduler(\'dask.dataframe\', \'_Frame\', \'threads\', False)\n'], returncode=1)

    def check_returncode(self):
        """Raise CalledProcessError if the exit code is non-zero."""
        if self.returncode:
>           raise CalledProcessError(self.returncode, self.args, self.stdout,
                                     self.stderr)
E           subprocess.CalledProcessError: Command '['/home/runner/miniconda3/envs/test-environment/bin/python3.12', '-c', 'def check_default_scheduler(module, collection, expected, emscripten):\n    from contextlib import nullcontext\n    from unittest import mock\n\n    from dask.local import get_sync\n\n    if emscripten:\n        ctx = mock.patch("dask.base.named_schedulers", {"sync": get_sync})\n    else:\n        ctx = nullcontext()\n    with ctx:\n        import importlib\n\n        if expected == "sync":\n            from dask.local import get_sync as get\n        elif expected == "threads":\n            from dask.threaded import get\n        elif expected == "processes":\n            from dask.multiprocessing import get\n\n        mod = importlib.import_module(module)\n\n        assert getattr(mod, collection).__dask_scheduler__ == get\ncheck_default_scheduler(\'dask.dataframe\', \'_Frame\', \'threads\', False)\n']' returned non-zero exit status 1.

../../../miniconda3/envs/test-environment/lib/python3.12/subprocess.py:502: CalledProcessError

Check warning on line 0 in dask.bag.tests.test_bag

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_to_dataframe_optimize_graph (dask.bag.tests.test_bag)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 1s]
Raw output
TypeError: FrameBase.__init__() takes 2 positional arguments but 5 were given
def test_to_dataframe_optimize_graph():
        pytest.importorskip("dask.dataframe")
        from dask.dataframe.utils import assert_eq as assert_eq_df
        from dask.dataframe.utils import pyarrow_strings_enabled
    
        x = db.from_sequence(
            [{"name": "test1", "v1": 1}, {"name": "test2", "v1": 2}], npartitions=2
        )
    
        # linear `map` tasks will be fused by graph optimization
        with dask.annotate(foo=True):
            y = x.map(lambda a: dict(**a, v2=a["v1"] + 1))
            y = y.map(lambda a: dict(**a, v3=a["v2"] + 1))
            y = y.map(lambda a: dict(**a, v4=a["v3"] + 1))
    
        # verifying the maps are not fused yet
        assert len(y.dask) == y.npartitions * 4
    
        # with optimizations
>       d = y.to_dataframe()

dask/bag/tests/test_bag.py:1645: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = dask.bag<lambda, npartitions=2>
meta =     name  v1  v2  v3  v4
0  test1   1   2   3   4, columns = None
optimize_graph = True

    def to_dataframe(self, meta=None, columns=None, optimize_graph=True):
        """Create Dask Dataframe from a Dask Bag.
    
        Bag should contain tuples, dict records, or scalars.
    
        Index will not be particularly meaningful.  Use ``reindex`` afterwards
        if necessary.
    
        Parameters
        ----------
        meta : pd.DataFrame, dict, iterable, optional
            An empty ``pd.DataFrame`` that matches the dtypes and column names
            of the output. This metadata is necessary for many algorithms in
            dask dataframe to work.  For ease of use, some alternative inputs
            are also available. Instead of a ``DataFrame``, a ``dict`` of
            ``{name: dtype}`` or iterable of ``(name, dtype)`` can be provided.
            If not provided or a list, a single element from the first
            partition will be computed, triggering a potentially expensive call
            to ``compute``. This may lead to unexpected results, so providing
            ``meta`` is recommended. For more information, see
            ``dask.dataframe.utils.make_meta``.
        columns : sequence, optional
            Column names to use. If the passed data do not have names
            associated with them, this argument provides names for the columns.
            Otherwise this argument indicates the order of the columns in the
            result (any names not found in the data will become all-NA
            columns).  Note that if ``meta`` is provided, column names will be
            taken from there and this parameter is invalid.
        optimize_graph : bool, optional
            If True [default], the graph is optimized before converting into
            :class:`dask.dataframe.DataFrame`.
    
    
        Examples
        --------
        >>> import dask.bag as db
        >>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
        ...                       {'name': 'Bob',     'balance': 200},
        ...                       {'name': 'Charlie', 'balance': 300}],
        ...                      npartitions=2)
        >>> df = b.to_dataframe()
    
        >>> df.compute()
              name  balance
        0    Alice      100
        1      Bob      200
        0  Charlie      300
        """
        import pandas as pd
    
        import dask.dataframe as dd
    
        if meta is None:
            head = self.take(1, warn=False)
            if len(head) == 0:
                raise ValueError(
                    "`dask.bag.Bag.to_dataframe` failed to "
                    "properly infer metadata, please pass in "
                    "metadata via the `meta` keyword"
                )
            meta = pd.DataFrame(list(head), columns=columns)
        elif columns is not None:
            raise ValueError("Can't specify both `meta` and `columns`")
        else:
            meta = dd.utils.make_meta(meta, parent_meta=pd.DataFrame())
        # Serializing the columns and dtypes is much smaller than serializing
        # the empty frame
        cols = list(meta.columns)
        dtypes = meta.dtypes.to_dict()
    
        dfs = self.map_partitions(to_dataframe, cols, dtypes)
        if optimize_graph:
            dsk = self.__dask_optimize__(dfs.dask, dfs.__dask_keys__())
        else:
            dsk = dfs.dask
    
        divisions = [None] * (self.npartitions + 1)
>       return dd.DataFrame(dsk, dfs.name, meta, divisions)
E       TypeError: FrameBase.__init__() takes 2 positional arguments but 5 were given

dask/bag/core.py:1624: TypeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_futures_to_delayed_dataframe (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 7s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: 'tcp://127.0.0.1:41423' processes=2 threads=2, memory=31.21 GiB>

    def test_futures_to_delayed_dataframe(c):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        df = pd.DataFrame({"x": [1, 2, 3]})
    
        futures = c.scatter([df, df])
        ddf = dd.from_delayed(futures)
        dd.utils.assert_eq(ddf.compute(), pd.concat([df, df], axis=0))
    
        # Make sure from_delayed is Blockwise
>       assert isinstance(ddf.dask.layers[ddf._name], Blockwise)
E       AttributeError: 'dict' object has no attribute 'layers'

dask/tests/test_distributed.py:115: AttributeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_fused_blockwise_dataframe_merge[True] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 6s]
Raw output
RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
from __future__ import annotations
    
    import asyncio
    import contextlib
    import dataclasses
    import heapq
    import inspect
    import itertools
    import json
    import logging
    import math
    import operator
    import os
    import pickle
    import random
    import textwrap
    import uuid
    import warnings
    import weakref
    from collections import defaultdict, deque
    from collections.abc import (
        Callable,
        Collection,
        Container,
        Hashable,
        Iterable,
        Iterator,
        Mapping,
        Sequence,
        Set,
    )
    from contextlib import suppress
    from functools import partial
    from typing import TYPE_CHECKING, Any, ClassVar, Literal, NamedTuple, cast, overload
    
    import psutil
    import tornado.web
    from sortedcontainers import SortedDict, SortedSet
    from tlz import (
        concat,
        first,
        groupby,
        merge,
        merge_sorted,
        merge_with,
        partition,
        pluck,
        second,
        take,
        valmap,
    )
    from tornado.ioloop import IOLoop
    
    import dask
    from dask.core import get_deps, validate_key
    from dask.typing import Key, no_default
    from dask.utils import (
        ensure_dict,
        format_bytes,
        format_time,
        key_split,
        parse_bytes,
        parse_timedelta,
        tmpfile,
    )
    from dask.widgets import get_template
    
    from distributed import cluster_dump, preloading, profile
    from distributed import versions as version_module
    from distributed._asyncio import RLock
    from distributed._stories import scheduler_story
    from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
    from distributed.batched import BatchedSend
    from distributed.client import SourceCode
    from distributed.collections import HeapSet
    from distributed.comm import (
        Comm,
        CommClosedError,
        get_address_host,
        normalize_address,
        resolve_address,
        unparse_host_port,
    )
    from distributed.comm.addressing import addresses_from_user_args
    from distributed.compatibility import PeriodicCallback
    from distributed.core import (
        ErrorMessage,
        OKMessage,
        Status,
        clean_exception,
        error_message,
        rpc,
        send_recv,
    )
    from distributed.diagnostics.memory_sampler import MemorySamplerExtension
    from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name
    from distributed.event import EventExtension
    from distributed.http import get_handlers
    from distributed.lock import LockExtension
    from distributed.metrics import monotonic, time
    from distributed.multi_lock import MultiLockExtension
    from distributed.node import ServerNode
    from distributed.proctitle import setproctitle
    from distributed.protocol import deserialize
    from distributed.protocol.pickle import dumps, loads
    from distributed.protocol.serialize import Serialized, ToPickle, serialize
    from distributed.publish import PublishExtension
    from distributed.pubsub import PubSubSchedulerExtension
    from distributed.queues import QueueExtension
    from distributed.recreate_tasks import ReplayTaskScheduler
    from distributed.security import Security
    from distributed.semaphore import SemaphoreExtension
    from distributed.shuffle import ShuffleSchedulerPlugin
    from distributed.spans import SpansSchedulerExtension
    from distributed.stealing import WorkStealing
    from distributed.utils import (
        All,
        TimeoutError,
        format_dashboard_link,
        get_fileno_limit,
        is_python_shutting_down,
        key_split_group,
        log_errors,
        offload,
        recursive_to_dict,
        wait_for,
    )
    from distributed.utils_comm import (
        gather_from_workers,
        retry_operation,
        scatter_to_workers,
        unpack_remotedata,
    )
    from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
    from distributed.variable import VariableExtension
    from distributed.worker import _normalize_task
    
    if TYPE_CHECKING:
        # TODO import from typing (requires Python >=3.10)
        from typing_extensions import TypeAlias
    
        from dask.highlevelgraph import HighLevelGraph
    
    # Not to be confused with distributed.worker_state_machine.TaskStateState
    TaskStateState: TypeAlias = Literal[
        "released",
        "waiting",
        "no-worker",
        "queued",
        "processing",
        "memory",
        "erred",
        "forgotten",
    ]
    
    ALL_TASK_STATES: Set[TaskStateState] = set(TaskStateState.__args__)  # type: ignore
    
    # {task key -> finish state}
    # Not to be confused with distributed.worker_state_machine.Recs
    Recs: TypeAlias = dict[Key, TaskStateState]
    # {client or worker address: [{op: <key>, ...}, ...]}
    Msgs: TypeAlias = dict[str, list[dict[str, Any]]]
    # (recommendations, client messages, worker messages)
    RecsMsgs: TypeAlias = tuple[Recs, Msgs, Msgs]
    
    T_runspec: TypeAlias = tuple[Callable, tuple, dict[str, Any]]
    
    logger = logging.getLogger(__name__)
    LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")
    DEFAULT_DATA_SIZE = parse_bytes(
        dask.config.get("distributed.scheduler.default-data-size")
    )
    STIMULUS_ID_UNSET = "<stimulus_id unset>"
    
    DEFAULT_EXTENSIONS = {
        "locks": LockExtension,
        "multi_locks": MultiLockExtension,
        "publish": PublishExtension,
        "replay-tasks": ReplayTaskScheduler,
        "queues": QueueExtension,
        "variables": VariableExtension,
        "pubsub": PubSubSchedulerExtension,
        "semaphores": SemaphoreExtension,
        "events": EventExtension,
        "amm": ActiveMemoryManagerExtension,
        "memory_sampler": MemorySamplerExtension,
        "shuffle": ShuffleSchedulerPlugin,
        "spans": SpansSchedulerExtension,
        "stealing": WorkStealing,
    }
    
    
    class ClientState:
        """A simple object holding information about a client."""
    
        #: A unique identifier for this client. This is generally an opaque
        #: string generated by the client itself.
        client_key: str
    
        #: Cached hash of :attr:`~ClientState.client_key`
        _hash: int
    
        #: A set of tasks this client wants to be kept in memory, so that it can download
        #: its result when desired. This is the reverse mapping of
        #: :class:`TaskState.who_wants`. Tasks are typically removed from this set when the
        #: corresponding object in the client's space (for example a ``Future`` or a Dask
        #: collection) gets garbage-collected.
        wants_what: set[TaskState]
    
        #: The last time we received a heartbeat from this client, in local scheduler time.
        last_seen: float
    
        #: Output of :func:`distributed.versions.get_versions` on the client
        versions: dict[str, Any]
    
        __slots__ = tuple(__annotations__)
    
        def __init__(self, client: str, *, versions: dict[str, Any] | None = None):
            self.client_key = client
            self._hash = hash(client)
            self.wants_what = set()
            self.last_seen = time()
            self.versions = versions or {}
    
        def __hash__(self) -> int:
            return self._hash
    
        def __eq__(self, other: object) -> bool:
            if not isinstance(other, ClientState):
                return False
            return self.client_key == other.client_key
    
        def __repr__(self) -> str:
            return f"<Client {self.client_key!r}>"
    
        def __str__(self) -> str:
            return self.client_key
    
        def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict:
            """Dictionary representation for debugging purposes.
            Not type stable and not intended for roundtrips.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            TaskState._to_dict
            """
            return recursive_to_dict(
                self,
                exclude=set(exclude) | {"versions"},  # type: ignore
                members=True,
            )
    
    
    class MemoryState:
        """Memory readings on a worker or on the whole cluster.
    
        See :doc:`worker-memory`.
    
        Attributes / properties:
    
        managed_total
            Sum of the output of sizeof() for all dask keys held by the worker in memory,
            plus number of bytes spilled to disk
    
        managed
            Sum of the output of sizeof() for the dask keys held in RAM. Note that this may
            be inaccurate, which may cause inaccurate unmanaged memory (see below).
    
        spilled
            Number of bytes  for the dask keys spilled to the hard drive.
            Note that this is the size on disk; size in memory may be different due to
            compression and inaccuracies in sizeof(). In other words, given the same keys,
            'managed' will change depending on the keys being in memory or spilled.
    
        process
            Total RSS memory measured by the OS on the worker process.
            This is always exactly equal to managed + unmanaged.
    
        unmanaged
            process - managed. This is the sum of
    
            - Python interpreter and modules
            - global variables
            - memory temporarily allocated by the dask tasks that are currently running
            - memory fragmentation
            - memory leaks
            - memory not yet garbage collected
            - memory not yet free()'d by the Python memory manager to the OS
    
        unmanaged_old
            Minimum of the 'unmanaged' measures over the last
            ``distributed.memory.recent-to-old-time`` seconds
    
        unmanaged_recent
            unmanaged - unmanaged_old; in other words process memory that has been recently
            allocated but is not accounted for by dask; hopefully it's mostly a temporary
            spike.
    
        optimistic
            managed + unmanaged_old; in other words the memory held long-term by
            the process under the hopeful assumption that all unmanaged_recent memory is a
            temporary spike
        """
    
        process: int
        unmanaged_old: int
        managed: int
        spilled: int
    
        __slots__ = tuple(__annotations__)
    
        def __init__(
            self,
            *,
            process: int,
            unmanaged_old: int,
            managed: int,
            spilled: int,
        ):
            # Some data arrives with the heartbeat, some other arrives in realtime as the
            # tasks progress. Also, sizeof() is not guaranteed to return correct results.
            # This can cause glitches where a partial measure is larger than the whole, so
            # we need to force all numbers to add up exactly by definition.
            self.process = process
            self.managed = min(self.process, managed)
            self.spilled = spilled
            # Subtractions between unsigned ints guaranteed by construction to be >= 0
            self.unmanaged_old = min(unmanaged_old, process - self.managed)
    
        @staticmethod
        def sum(*infos: MemoryState) -> MemoryState:
            process = 0
            unmanaged_old = 0
            managed = 0
            spilled = 0
            for ms in infos:
                process += ms.process
                unmanaged_old += ms.unmanaged_old
                spilled += ms.spilled
                managed += ms.managed
            return MemoryState(
                process=process,
                unmanaged_old=unmanaged_old,
                managed=managed,
                spilled=spilled,
            )
    
        @property
        def managed_total(self) -> int:
            return self.managed + self.spilled
    
        @property
        def unmanaged(self) -> int:
            # This is never negative thanks to __init__
            return self.process - self.managed
    
        @property
        def unmanaged_recent(self) -> int:
            # This is never negative thanks to __init__
            return self.process - self.managed - self.unmanaged_old
    
        @property
        def optimistic(self) -> int:
            return self.managed + self.unmanaged_old
    
        @property
        def managed_in_memory(self) -> int:
            warnings.warn("managed_in_memory has been renamed to managed", FutureWarning)
            return self.managed
    
        @property
        def managed_spilled(self) -> int:
            warnings.warn("managed_spilled has been renamed to spilled", FutureWarning)
            return self.spilled
    
        def __repr__(self) -> str:
            return (
                f"Process memory (RSS)  : {format_bytes(self.process)}\n"
                f"  - managed by Dask   : {format_bytes(self.managed)}\n"
                f"  - unmanaged (old)   : {format_bytes(self.unmanaged_old)}\n"
                f"  - unmanaged (recent): {format_bytes(self.unmanaged_recent)}\n"
                f"Spilled to disk       : {format_bytes(self.spilled)}\n"
            )
    
        def _to_dict(self, *, exclude: Container[str] = ()) -> dict:
            """Dictionary representation for debugging purposes.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            """
            return {
                k: getattr(self, k)
                for k in dir(self)
                if not k.startswith("_")
                and k not in {"sum", "managed_in_memory", "managed_spilled"}
            }
    
    
    class WorkerState:
        """A simple object holding information about a worker.
    
        Not to be confused with :class:`distributed.worker_state_machine.WorkerState`.
        """
    
        #: This worker's unique key. This can be its connected address
        #: (such as ``"tcp://127.0.0.1:8891"``) or an alias (such as ``"alice"``).
        address: str
    
        pid: int
        name: Hashable
    
        #: The number of CPU threads made available on this worker
        nthreads: int
    
        #: Memory available to the worker, in bytes
        memory_limit: int
    
        local_directory: str
        services: dict[str, int]
    
        #: Output of :meth:`distributed.versions.get_versions` on the worker
        versions: dict[str, Any]
    
        #: Address of the associated :class:`~distributed.nanny.Nanny`, if present
        nanny: str
    
        #: Read-only worker status, synced one way from the remote Worker object
        status: Status
    
        #: Cached hash of :attr:`~WorkerState.address`
        _hash: int
    
        #: The total memory size, in bytes, used by the tasks this worker holds in memory
        #: (i.e. the tasks in this worker's :attr:`~WorkerState.has_what`).
        nbytes: int
    
        #: Worker memory unknown to the worker, in bytes, which has been there for more than
        #: 30 seconds. See :class:`MemoryState`.
        _memory_unmanaged_old: int
    
        #: History of the last 30 seconds' worth of unmanaged memory. Used to differentiate
        #: between "old" and "new" unmanaged memory.
        #: Format: ``[(timestamp, bytes), (timestamp, bytes), ...]``
        _memory_unmanaged_history: deque[tuple[float, int]]
    
        metrics: dict[str, Any]
    
        #: The last time we received a heartbeat from this worker, in local scheduler time.
        last_seen: float
    
        time_delay: float
        bandwidth: float
    
        #: A set of all TaskStates on this worker that are actors. This only includes those
        #: actors whose state actually lives on this worker, not actors to which this worker
        #: has a reference.
        actors: set[TaskState]
    
        #: Underlying data of :meth:`WorkerState.has_what`
        _has_what: dict[TaskState, None]
    
        #: A set of tasks that have been submitted to this worker. Multiple tasks may be
        # submitted to a worker in advance and the worker will run them eventually,
        # depending on its execution resources (but see :doc:`work-stealing`).
        #:
        #: All the tasks here are in the "processing" state.
        #: This attribute is kept in sync with :attr:`TaskState.processing_on`.
        processing: set[TaskState]
    
        #: Running tasks that invoked :func:`distributed.secede`
        long_running: set[TaskState]
    
        #: A dictionary of tasks that are currently being run on this worker.
        #: Each task state is associated with the duration in seconds which the task has
        #: been running.
        executing: dict[TaskState, float]
    
        #: The available resources on this worker, e.g. ``{"GPU": 2}``.
        #: These are abstract quantities that constrain certain tasks from running at the
        #: same time on this worker.
        resources: dict[str, float]
    
        #: The sum of each resource used by all tasks allocated to this worker.
        #: The numbers in this dictionary can only be less or equal than those in this
        #: worker's :attr:`~WorkerState.resources`.
        used_resources: dict[str, float]
    
        #: Arbitrary additional metadata to be added to :meth:`~WorkerState.identity`
        extra: dict[str, Any]
    
        # The unique server ID this WorkerState is referencing
        server_id: str
    
        # Reference to scheduler task_groups
        scheduler_ref: weakref.ref[SchedulerState] | None
        task_prefix_count: defaultdict[str, int]
        _network_occ: float
        _occupancy_cache: float | None
    
        #: Keys that may need to be fetched to this worker, and the number of tasks that need them.
        #: All tasks are currently in `memory` on a worker other than this one.
        #: Much like `processing`, this does not exactly reflect worker state:
        #: keys here may be queued to fetch, in flight, or already in memory
        #: on the worker.
        needs_what: dict[TaskState, int]
    
        __slots__ = tuple(__annotations__)
    
        def __init__(
            self,
            *,
            address: str,
            status: Status,
            pid: int,
            name: object,
            nthreads: int = 0,
            memory_limit: int,
            local_directory: str,
            nanny: str,
            server_id: str,
            services: dict[str, int] | None = None,
            versions: dict[str, Any] | None = None,
            extra: dict[str, Any] | None = None,
            scheduler: SchedulerState | None = None,
        ):
            self.server_id = server_id
            self.address = address
            self.pid = pid
            self.name = name
            self.nthreads = nthreads
            self.memory_limit = memory_limit
            self.local_directory = local_directory
            self.services = services or {}
            self.versions = versions or {}
            self.nanny = nanny
            self.status = status
            self._hash = hash(self.server_id)
            self.nbytes = 0
            self._memory_unmanaged_old = 0
            self._memory_unmanaged_history = deque()
            self.metrics = {}
            self.last_seen = 0
            self.time_delay = 0
            self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
            self.actors = set()
            self._has_what = {}
            self.processing = set()
            self.long_running = set()
            self.executing = {}
            self.resources = {}
            self.used_resources = {}
            self.extra = extra or {}
            self.scheduler_ref = weakref.ref(scheduler) if scheduler else None
            self.task_prefix_count = defaultdict(int)
            self.needs_what = {}
            self._network_occ = 0
            self._occupancy_cache = None
    
        def __hash__(self) -> int:
            return self._hash
    
        def __eq__(self, other: object) -> bool:
            return isinstance(other, WorkerState) and other.server_id == self.server_id
    
        @property
        def has_what(self) -> Set[TaskState]:
            """An insertion-sorted set-like of tasks which currently reside on this worker.
            All the tasks here are in the "memory" state.
            This is the reverse mapping of :attr:`TaskState.who_has`.
    
            This is a read-only public accessor. The data is implemented as a dict without
            values, because rebalance() relies on dicts being insertion-sorted.
            """
            return self._has_what.keys()
    
        @property
        def host(self) -> str:
            return get_address_host(self.address)
    
        @property
        def memory(self) -> MemoryState:
            """Polished memory metrics for the worker.
    
            **Design note on managed memory**
    
            There are two measures available for managed memory:
    
            - ``self.nbytes``
            - ``self.metrics["managed_bytes"]``
    
            At rest, the two numbers must be identical. However, ``self.nbytes`` is
            immediately updated through the batched comms as soon as each task lands in
            memory on the worker; ``self.metrics["managed_bytes"]`` instead is updated by
            the heartbeat, which can lag several seconds behind.
    
            Below we are mixing likely newer managed memory info from ``self.nbytes`` with
            process and spilled memory from the heartbeat. This is deliberate, so that
            managed memory total is updated more frequently.
    
            Managed memory directly and immediately contributes to optimistic memory, which
            is in turn used in Active Memory Manager heuristics (at the moment of writing;
            more uses will likely be added in the future). So it's important to have it
            up to date; much more than it is for process memory.
    
            Having up-to-date managed memory info as soon as the scheduler learns about
            task completion also substantially simplifies unit tests.
    
            The flip side of this design is that it may cause some noise in the
            unmanaged_recent measure. e.g.:
    
            1. Delete 100MB of managed data
            2. The updated managed memory reaches the scheduler faster than the
               updated process memory
            3. There's a blip where the scheduler thinks that there's a sudden 100MB
               increase in unmanaged_recent, since process memory hasn't changed but managed
               memory has decreased by 100MB
            4. When the heartbeat arrives, process memory goes down and so does the
               unmanaged_recent.
    
            This is OK - one of the main reasons for the unmanaged_recent / unmanaged_old
            split is exactly to concentrate all the noise in unmanaged_recent and exclude it
            from optimistic memory, which is used for heuristics.
    
            Something that is less OK, but also less frequent, is that the sudden deletion
            of spilled keys will cause a negative blip in managed memory:
    
            1. Delete 100MB of spilled data
            2. The updated managed memory *total* reaches the scheduler faster than the
               updated spilled portion
            3. This causes the managed memory to temporarily plummet and be replaced by
               unmanaged_recent, while spilled memory remains unaltered
            4. When the heartbeat arrives, managed goes back up, unmanaged_recent
               goes back down, and spilled goes down by 100MB as it should have to
               begin with.
    
            :issue:`6002` will let us solve this.
            """
            return MemoryState(
                process=self.metrics["memory"],
                managed=max(0, self.nbytes - self.metrics["spilled_bytes"]["memory"]),
                spilled=self.metrics["spilled_bytes"]["disk"],
                unmanaged_old=self._memory_unmanaged_old,
            )
    
        def clean(self) -> WorkerState:
            """Return a version of this object that is appropriate for serialization"""
            ws = WorkerState(
                address=self.address,
                status=self.status,
                pid=self.pid,
                name=self.name,
                nthreads=self.nthreads,
                memory_limit=self.memory_limit,
                local_directory=self.local_directory,
                services=self.services,
                nanny=self.nanny,
                extra=self.extra,
                server_id=self.server_id,
            )
            ws._occupancy_cache = self.occupancy
    
            ws.executing = {
                ts.key: duration for ts, duration in self.executing.items()  # type: ignore
            }
            return ws
    
        def __repr__(self) -> str:
            name = f", name: {self.name}" if self.name != self.address else ""
            return (
                f"<WorkerState {self.address!r}{name}, "
                f"status: {self.status.name}, "
                f"memory: {len(self.has_what)}, "
                f"processing: {len(self.processing)}>"
            )
    
        def _repr_html_(self) -> str:
            return get_template("worker_state.html.j2").render(
                address=self.address,
                name=self.name,
                status=self.status.name,
                has_what=self.has_what,
                processing=self.processing,
            )
    
        def identity(self) -> dict[str, Any]:
            return {
                "type": "Worker",
                "id": self.name,
                "host": self.host,
                "resources": self.resources,
                "local_directory": self.local_directory,
                "name": self.name,
                "nthreads": self.nthreads,
                "memory_limit": self.memory_limit,
                "last_seen": self.last_seen,
                "services": self.services,
                "metrics": self.metrics,
                "status": self.status.name,
                "nanny": self.nanny,
                **self.extra,
            }
    
        def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict[str, Any]:
            """Dictionary representation for debugging purposes.
            Not type stable and not intended for roundtrips.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            TaskState._to_dict
            """
            return recursive_to_dict(
                self,
                exclude=set(exclude) | {"versions"},  # type: ignore
                members=True,
            )
    
        @property
        def scheduler(self) -> SchedulerState:
            assert self.scheduler_ref
            s = self.scheduler_ref()
            assert s
            return s
    
        def add_to_processing(self, ts: TaskState) -> None:
            """Assign a task to this worker for compute."""
            if self.scheduler.validate:
                assert ts not in self.processing
    
            tp = ts.prefix
            self.task_prefix_count[tp.name] += 1
            self.scheduler._task_prefix_count_global[tp.name] += 1
            self.processing.add(ts)
            for dts in ts.dependencies:
                assert dts.who_has
                if self not in dts.who_has:
                    self._inc_needs_replica(dts)
    
        def add_to_long_running(self, ts: TaskState) -> None:
            if self.scheduler.validate:
                assert ts in self.processing
                assert ts not in self.long_running
    
            self._remove_from_task_prefix_count(ts)
            # Cannot remove from processing since we're using this for things like
            # idleness detection. Idle workers are typically targeted for
            # downscaling but we should not downscale workers with long running
            # tasks
            self.long_running.add(ts)
    
        def remove_from_processing(self, ts: TaskState) -> None:
            """Remove a task from a workers processing"""
            if self.scheduler.validate:
                assert ts in self.processing
    
            if ts in self.long_running:
                self.long_running.discard(ts)
            else:
                self._remove_from_task_prefix_count(ts)
            self.processing.remove(ts)
            for dts in ts.dependencies:
                if dts in self.needs_what:
                    self._dec_needs_replica(dts)
    
        def _remove_from_task_prefix_count(self, ts: TaskState) -> None:
            count = self.task_prefix_count[ts.prefix.name] - 1
            if count:
                self.task_prefix_count[ts.prefix.name] = count
            else:
                del self.task_prefix_count[ts.prefix.name]
    
            count = self.scheduler._task_prefix_count_global[ts.prefix.name] - 1
            if count:
                self.scheduler._task_prefix_count_global[ts.prefix.name] = count
            else:
                del self.scheduler._task_prefix_count_global[ts.prefix.name]
    
        def remove_replica(self, ts: TaskState) -> None:
            """The worker no longer has a task in memory"""
            if self.scheduler.validate:
                assert ts.who_has
                assert self in ts.who_has
                assert ts in self.has_what
                assert ts not in self.needs_what
    
            self.nbytes -= ts.get_nbytes()
            del self._has_what[ts]
            ts.who_has.remove(self)  # type: ignore
            if not ts.who_has:
                ts.who_has = None
    
        def _inc_needs_replica(self, ts: TaskState) -> None:
            """Assign a task fetch to this worker and update network occupancies"""
            if self.scheduler.validate:
                assert ts.who_has
                assert self not in ts.who_has
                assert ts not in self.has_what
            if ts not in self.needs_what:
                self.needs_what[ts] = 1
                nbytes = ts.get_nbytes()
                self._network_occ += nbytes
                self.scheduler._network_occ_global += nbytes
            else:
                self.needs_what[ts] += 1
    
        def _dec_needs_replica(self, ts: TaskState) -> None:
            if self.scheduler.validate:
                assert ts in self.needs_what
    
            self.needs_what[ts] -= 1
            if self.needs_what[ts] == 0:
                del self.needs_what[ts]
                nbytes = ts.get_nbytes()
                self._network_occ -= nbytes
                self.scheduler._network_occ_global -= nbytes
    
        def add_replica(self, ts: TaskState) -> None:
            """The worker acquired a replica of task"""
            if ts.who_has is None:
                ts.who_has = set()
            if ts in self._has_what:
                return
            nbytes = ts.get_nbytes()
            if ts in self.needs_what:
                del self.needs_what[ts]
                self._network_occ -= nbytes
                self.scheduler._network_occ_global -= nbytes
            ts.who_has.add(self)
            self.nbytes += nbytes
            self._has_what[ts] = None
    …                      link = format_dashboard_link(addr, server.port)
                    # formatting dashboard link can fail if distributed.dashboard.link
                    # refers to non-existant env vars.
                    except KeyError as e:
                        logger.warning(
                            f"Failed to format dashboard link, unknown value: {e}"
                        )
                        link = f":{server.port}"
                else:
                    link = f"{listen_ip}:{server.port}"
                logger.info("%11s at:  %25s", name, link)
    
            if self.scheduler_file:
                with open(self.scheduler_file, "w") as f:
                    json.dump(self.identity(), f, indent=2)
    
                fn = self.scheduler_file  # remove file when we close the process
    
                def del_scheduler_file():
                    if os.path.exists(fn):
                        os.remove(fn)
    
                weakref.finalize(self, del_scheduler_file)
    
            await self.preloads.start()
    
            if self.jupyter:
                # Allow insecure communications from local users
                if self.address.startswith("tls://"):
                    await self.listen("tcp://localhost:0")
                os.environ["DASK_SCHEDULER_ADDRESS"] = self.listeners[-1].contact_address
    
            await asyncio.gather(
                *[plugin.start(self) for plugin in list(self.plugins.values())]
            )
    
            self.start_periodic_callbacks()
    
            setproctitle(f"dask scheduler [{self.address}]")
            return self
    
        async def close(self, fast=None, close_workers=None, reason=""):
            """Send cleanup signal to all coroutines then wait until finished
    
            See Also
            --------
            Scheduler.cleanup
            """
            if fast is not None or close_workers is not None:
                warnings.warn(
                    "The 'fast' and 'close_workers' parameters in Scheduler.close have no "
                    "effect and will be removed in a future version of distributed.",
                    FutureWarning,
                )
            if self.status in (Status.closing, Status.closed):
                await self.finished()
                return
    
            async def log_errors(func):
                try:
                    await func()
                except Exception:
                    logger.exception("Plugin call failed during scheduler.close")
    
            await asyncio.gather(
                *[log_errors(plugin.before_close) for plugin in list(self.plugins.values())]
            )
    
            self.status = Status.closing
            logger.info("Scheduler closing due to %s...", reason or "unknown reason")
            setproctitle("dask scheduler [closing]")
    
            await self.preloads.teardown()
    
            await asyncio.gather(
                *[log_errors(plugin.close) for plugin in list(self.plugins.values())]
            )
    
            for pc in self.periodic_callbacks.values():
                pc.stop()
            self.periodic_callbacks.clear()
    
            self.stop_services()
    
            for ext in self.extensions.values():
                with suppress(AttributeError):
                    ext.teardown()
            logger.info("Scheduler closing all comms")
    
            futures = []
            for _, comm in list(self.stream_comms.items()):
                # FIXME use `self.remove_worker()` instead after https://github.com/dask/distributed/issues/6390
                if not comm.closed():
                    # This closes the Worker and ensures that if a Nanny is around,
                    # it is closed as well
                    comm.send({"op": "close", "reason": "scheduler-close"})
                    comm.send({"op": "close-stream"})
                    # ^ TODO remove? `Worker.close` will close the stream anyway.
                with suppress(AttributeError):
                    futures.append(comm.close())
    
            await asyncio.gather(*futures)
    
            if self.jupyter:
                await self._jupyter_server_application._cleanup()
    
            for comm in self.client_comms.values():
                comm.abort()
    
            await self.rpc.close()
    
            self.status = Status.closed
            self.stop()
            await super().close()
    
            setproctitle("dask scheduler [closed]")
            disable_gc_diagnosis()
    
        ###########
        # Stimuli #
        ###########
    
        def heartbeat_worker(
            self,
            *,
            address: str,
            resolve_address: bool = True,
            now: float | None = None,
            resources: dict[str, float] | None = None,
            host_info: dict | None = None,
            metrics: dict,
            executing: dict[Key, float] | None = None,
            extensions: dict | None = None,
        ) -> dict[str, Any]:
            address = self.coerce_address(address, resolve_address)
            address = normalize_address(address)
            ws = self.workers.get(address)
            if ws is None:
                logger.warning(f"Received heartbeat from unregistered worker {address!r}.")
                return {"status": "missing"}
    
            host = get_address_host(address)
            local_now = time()
            host_info = host_info or {}
    
            dh: dict = self.host_info.setdefault(host, {})
            dh["last-seen"] = local_now
    
            frac = 1 / len(self.workers)
            self.bandwidth = (
                self.bandwidth * (1 - frac) + metrics["bandwidth"]["total"] * frac
            )
            for other, (bw, count) in metrics["bandwidth"]["workers"].items():
                if (address, other) not in self.bandwidth_workers:
                    self.bandwidth_workers[address, other] = bw / count
                else:
                    alpha = (1 - frac) ** count
                    self.bandwidth_workers[address, other] = self.bandwidth_workers[
                        address, other
                    ] * alpha + bw * (1 - alpha)
            for typ, (bw, count) in metrics["bandwidth"]["types"].items():
                if typ not in self.bandwidth_types:
                    self.bandwidth_types[typ] = bw / count
                else:
                    alpha = (1 - frac) ** count
                    self.bandwidth_types[typ] = self.bandwidth_types[typ] * alpha + bw * (
                        1 - alpha
                    )
    
            ws.last_seen = local_now
            if executing is not None:
                # NOTE: the executing dict is unused
                ws.executing = {}
                for key, duration in executing.items():
                    if key in self.tasks:
                        ts = self.tasks[key]
                        ws.executing[ts] = duration
                        ts.prefix.add_exec_time(duration)
    
            for name, value in metrics["digests_total_since_heartbeat"].items():
                self.cumulative_worker_metrics[name] += value
    
            ws.metrics = metrics
    
            # Calculate RSS - dask keys, separating "old" and "new" usage
            # See MemoryState for details
            max_memory_unmanaged_old_hist_age = local_now - self.MEMORY_RECENT_TO_OLD_TIME
            memory_unmanaged_old = ws._memory_unmanaged_old
            while ws._memory_unmanaged_history:
                timestamp, size = ws._memory_unmanaged_history[0]
                if timestamp >= max_memory_unmanaged_old_hist_age:
                    break
                ws._memory_unmanaged_history.popleft()
                if size == memory_unmanaged_old:
                    memory_unmanaged_old = 0  # recalculate min()
    
            # ws._nbytes is updated at a different time and sizeof() may not be accurate,
            # so size may be (temporarily) negative; floor it to zero.
            size = max(
                0, metrics["memory"] - ws.nbytes + metrics["spilled_bytes"]["memory"]
            )
    
            ws._memory_unmanaged_history.append((local_now, size))
            if not memory_unmanaged_old:
                # The worker has just been started or the previous minimum has been expunged
                # because too old.
                # Note: this algorithm is capped to 200 * MEMORY_RECENT_TO_OLD_TIME elements
                # cluster-wide by heartbeat_interval(), regardless of the number of workers
                ws._memory_unmanaged_old = min(map(second, ws._memory_unmanaged_history))
            elif size < memory_unmanaged_old:
                ws._memory_unmanaged_old = size
    
            if host_info:
                dh = self.host_info.setdefault(host, {})
                dh.update(host_info)
    
            if now:
                ws.time_delay = local_now - now
    
            if resources:
                self.add_resources(worker=address, resources=resources)
    
            if extensions:
                for name, data in extensions.items():
                    self.extensions[name].heartbeat(ws, data)
    
            return {
                "status": "OK",
                "time": local_now,
                "heartbeat-interval": heartbeat_interval(len(self.workers)),
            }
    
        @log_errors
        async def add_worker(
            self,
            comm: Comm,
            *,
            address: str,
            status: str,
            server_id: str,
            nthreads: int,
            name: str,
            resolve_address: bool = True,
            now: float,
            resources: dict[str, float],
            # FIXME: This is never submitted by the worker
            host_info: None = None,
            memory_limit: int | None,
            metrics: dict[str, Any],
            pid: int = 0,
            services: dict[str, int],
            local_directory: str,
            versions: dict[str, Any],
            nanny: str,
            extra: dict,
            stimulus_id: str,
        ) -> None:
            """Add a new worker to the cluster"""
            address = self.coerce_address(address, resolve_address)
            address = normalize_address(address)
            host = get_address_host(address)
    
            if address in self.workers:
                raise ValueError("Worker already exists %s" % address)
    
            if name in self.aliases:
                logger.warning("Worker tried to connect with a duplicate name: %s", name)
                msg = {
                    "status": "error",
                    "message": "name taken, %s" % name,
                    "time": time(),
                }
                await comm.write(msg)
                return
    
            self.log_event(address, {"action": "add-worker"})
            self.log_event("all", {"action": "add-worker", "worker": address})
    
            self.workers[address] = ws = WorkerState(
                address=address,
                status=Status.lookup[status],  # type: ignore
                pid=pid,
                nthreads=nthreads,
                memory_limit=memory_limit or 0,
                name=name,
                local_directory=local_directory,
                services=services,
                versions=versions,
                nanny=nanny,
                extra=extra,
                server_id=server_id,
                scheduler=self,
            )
            if ws.status == Status.running:
                self.running.add(ws)
    
            dh = self.host_info.get(host)
            if dh is None:
                self.host_info[host] = dh = {}
    
            dh_addresses = dh.get("addresses")
            if dh_addresses is None:
                dh["addresses"] = dh_addresses = set()
                dh["nthreads"] = 0
    
            dh_addresses.add(address)
            dh["nthreads"] += nthreads
    
            self.total_nthreads += nthreads
            self.total_nthreads_history.append((time(), self.total_nthreads))
            self.aliases[name] = address
    
            self.heartbeat_worker(
                address=address,
                resolve_address=resolve_address,
                now=now,
                resources=resources,
                host_info=host_info,
                metrics=metrics,
            )
    
            # Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot
            # exist before this.
            self.check_idle_saturated(ws)
    
            self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)
    
            awaitables = []
            for plugin in list(self.plugins.values()):
                try:
                    result = plugin.add_worker(scheduler=self, worker=address)
                    if result is not None and inspect.isawaitable(result):
                        awaitables.append(result)
                except Exception as e:
                    logger.exception(e)
    
            plugin_msgs = await asyncio.gather(*awaitables, return_exceptions=True)
            plugins_exceptions = [msg for msg in plugin_msgs if isinstance(msg, Exception)]
            for exc in plugins_exceptions:
                logger.exception(exc, exc_info=exc)
    
            if ws.status == Status.running:
                self.transitions(
                    self.bulk_schedule_unrunnable_after_adding_worker(ws), stimulus_id
                )
                self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
    
            logger.info("Register worker %s", ws)
    
            msg = {
                "status": "OK",
                "time": time(),
                "heartbeat-interval": heartbeat_interval(len(self.workers)),
                "worker-plugins": self.worker_plugins,
            }
    
            version_warning = version_module.error_message(
                version_module.get_versions(),
                {w: ws.versions for w, ws in self.workers.items()},
                versions,
                source_name=str(ws.server_id),
            )
            msg.update(version_warning)
    
            await comm.write(msg)
            # This will keep running until the worker is removed
            await self.handle_worker(comm, address)
    
        async def add_nanny(self, comm: Comm, address: str) -> None:
            async with self._starting_nannies_cond:
                self._starting_nannies.add(address)
            try:
                msg = {
                    "status": "OK",
                    "nanny-plugins": self.nanny_plugins,
                }
                await comm.write(msg)
                await comm.read()
            finally:
                async with self._starting_nannies_cond:
                    self._starting_nannies.discard(address)
                    self._starting_nannies_cond.notify_all()
    
        def _match_graph_with_tasks(
            self,
            dsk: dict[Key, T_runspec],
            dependencies: dict[Key, set[Key]],
            keys: set[Key],
        ) -> set[Key]:
            n = 0
            lost_keys = set()
            while len(dsk) != n:  # walk through new tasks, cancel any bad deps
                n = len(dsk)
                for k, deps in list(dependencies.items()):
                    if any(
                        dep not in self.tasks and dep not in dsk for dep in deps
                    ):  # bad key
                        lost_keys.add(k)
                        logger.info("User asked for computation on lost data, %s", k)
                        del dsk[k]
                        del dependencies[k]
                        if k in keys:
                            keys.remove(k)
                    del deps
            # Avoid computation that is already finished
            done = set()  # tasks that are already done
            for k, v in dependencies.items():
                if v and k in self.tasks:
                    ts = self.tasks[k]
                    if ts.state in ("memory", "erred"):
                        done.add(k)
    
            if done:
                dependents = dask.core.reverse_dict(dependencies)
                stack = list(done)
                while stack:  # remove unnecessary dependencies
                    key = stack.pop()
                    try:
                        deps = dependencies[key]
                    except KeyError:
                        deps = {ts.key for ts in self.tasks[key].dependencies}
                    for dep in deps:
                        if dep in dependents:
                            child_deps = dependents[dep]
                        elif dep in self.tasks:
                            child_deps = {ts.key for ts in self.tasks[key].dependencies}
                        else:
                            child_deps = set()
                        if all(d in done for d in child_deps):
                            if dep in self.tasks and dep not in done:
                                done.add(dep)
                                stack.append(dep)
            for anc in done:
                dsk.pop(anc, None)
                dependencies.pop(anc, None)
            return lost_keys
    
        def _create_taskstate_from_graph(
            self,
            *,
            start: float,
            dsk: dict[Key, T_runspec],
            dependencies: dict,
            keys: set[Key],
            ordered: dict[Key, int],
            client: str,
            annotations_by_type: dict,
            global_annotations: dict | None,
            stimulus_id: str,
            submitting_task: Key | None,
            user_priority: int | dict[Key, int] = 0,
            actors: bool | list[Key] | None = None,
            fifo_timeout: float = 0.0,
            code: tuple[SourceCode, ...] = (),
        ) -> None:
            """
            Take a low level graph and create the necessary scheduler state to
            compute it.
    
            WARNING
            -------
            This method must not be made async since nothing here is concurrency
            safe. All interactions with TaskState objects here should be happening
            in the same event loop tick.
            """
    
            lost_keys = self._match_graph_with_tasks(dsk, dependencies, keys)
    
            if len(dsk) > 1:
                self.log_event(
                    ["all", client], {"action": "update_graph", "count": len(dsk)}
                )
    
            if lost_keys:
                self.report({"op": "cancelled-keys", "keys": lost_keys}, client=client)
                self.client_releases_keys(
                    keys=lost_keys, client=client, stimulus_id=stimulus_id
                )
    
            if not self.is_idle and self.computations:
                # Still working on something. Assign new tasks to same computation
                computation = self.computations[-1]
            else:
                computation = Computation()
                self.computations.append(computation)
    
            if code:  # add new code blocks
                computation.code.add(code)
            if global_annotations:
                # FIXME: This is kind of inconsistent since it only includes global
                # annotations.
                computation.annotations.update(global_annotations)
            del global_annotations
    
            runnable, touched_tasks, new_tasks = self._generate_taskstates(
                keys=keys,
                dsk=dsk,
                dependencies=dependencies,
                computation=computation,
            )
    
            keys_with_annotations = self._apply_annotations(
                tasks=new_tasks,
                annotations_by_type=annotations_by_type,
            )
    
            self._set_priorities(
                internal_priority=ordered,
                submitting_task=submitting_task,
                user_priority=user_priority,
                fifo_timeout=fifo_timeout,
                start=start,
                tasks=runnable,
            )
    
            self.client_desires_keys(keys=keys, client=client)
    
            # Add actors
            if actors is True:
                actors = list(keys)
            for actor in actors or []:
                ts = self.tasks[actor]
                ts.actor = True
    
            # Compute recommendations
            recommendations: Recs = {}
            priority = dict()
            for ts in sorted(
                runnable,
                key=operator.attrgetter("priority"),
                reverse=True,
            ):
                assert ts.priority  # mypy
                priority[ts.key] = ts.priority
                assert ts.run_spec
                if ts.state == "released":
                    recommendations[ts.key] = "waiting"
    
            for ts in runnable:
                for dts in ts.dependencies:
                    if dts.exception_blame:
                        ts.exception_blame = dts.exception_blame
                        recommendations[ts.key] = "erred"
                        break
    
            annotations_for_plugin: defaultdict[str, dict[Key, Any]] = defaultdict(dict)
            for key in keys_with_annotations:
                ts = self.tasks[key]
                if ts.annotations:
                    for annot, value in ts.annotations.items():
                        annotations_for_plugin[annot][key] = value
    
            spans_ext: SpansSchedulerExtension | None = self.extensions.get("spans")
            if spans_ext:
                # new_tasks does not necessarily contain all runnable tasks;
                # _generate_taskstates is not the only thing that calls new_task(). A
                # TaskState may have also been created by client_desires_keys or scatter,
                # and only later gained a run_spec.
                span_annotations = spans_ext.observe_tasks(runnable, code=code)
                # In case of TaskGroup collision, spans may have changed
                # FIXME: Is this used anywhere besides tests?
                if span_annotations:
                    annotations_for_plugin["span"] = span_annotations
                else:
                    annotations_for_plugin.pop("span", None)
    
            for plugin in list(self.plugins.values()):
                try:
                    plugin.update_graph(
                        self,
                        client=client,
                        tasks=[ts.key for ts in touched_tasks],
                        keys=keys,
                        dependencies=dependencies,
                        annotations=dict(annotations_for_plugin),
                        priority=priority,
                    )
                except Exception as e:
                    logger.exception(e)
    
            self.transitions(recommendations, stimulus_id)
    
            for ts in touched_tasks:
                if ts.state in ("memory", "erred"):
                    self.report_on_key(ts=ts, client=client)
    
        @log_errors
        async def update_graph(
            self,
            client: str,
            graph_header: dict,
            graph_frames: list[bytes],
            keys: set[Key],
            internal_priority: dict[Key, int] | None,
            submitting_task: Key | None,
            user_priority: int | dict[Key, int] = 0,
            actors: bool | list[Key] | None = None,
            fifo_timeout: float = 0.0,
            code: tuple[SourceCode, ...] = (),
            annotations: dict | None = None,
            stimulus_id: str | None = None,
        ) -> None:
            # FIXME: Apparently empty dicts arrive as a ToPickle object
            if isinstance(annotations, ToPickle):
                annotations = annotations.data  # type: ignore[unreachable]
            start = time()
            try:
                try:
>                   graph = deserialize(graph_header, graph_frames).data

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/scheduler.py:4671: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/serialize.py:439: in deserialize
    return loads(header, frames)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/serialize.py:101: in pickle_loads
    return pickle.loads(x, buffers=buffers)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/pickle.py:94: in loads
    return pickle.loads(x, buffers=buffers)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/__init__.py:1: in <module>
    from dask_expr import _version, datasets
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/datasets.py:7: in <module>
    from dask_expr._collection import new_collection
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:11: in <module>
    import dask.dataframe.methods as methods
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    from __future__ import annotations
    
    
    def _dask_expr_enabled() -> bool:
        import dask
    
        use_dask_expr = dask.config.get("dataframe.query-planning")
        if use_dask_expr:
            try:
                import dask_expr  # noqa: F401
            except ImportError:
                raise ValueError("Must install dask-expr to activate query planning.")
        return use_dask_expr
    
    
    if _dask_expr_enabled():
>       from dask_expr import (
            DataFrame,
            Index,
            Series,
            concat,
            from_array,
            from_dask_array,
            from_dask_dataframe,
            from_delayed,
            from_dict,
            from_graph,
            from_map,
            from_pandas,
            get_dummies,
            isna,
            map_overlap,
            map_partitions,
            merge,
            pivot_table,
            read_csv,
            read_hdf,
            read_json,
            read_orc,
            read_parquet,
            read_sql,
            read_sql_query,
            read_sql_table,
            read_table,
            repartition,
            to_bag,
            to_csv,
            to_datetime,
            to_hdf,
            to_json,
            to_numeric,
            to_orc,
            to_parquet,
            to_records,
            to_sql,
            to_timedelta,
        )
E       ImportError: cannot import name 'DataFrame' from partially initialized module 'dask_expr' (most likely due to a circular import) (/home/runner/miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/__init__.py)

dask/dataframe/__init__.py:17: ImportError

The above exception was the direct cause of the following exception:

c = <Client: 'tcp://127.0.0.1:38441' processes=2 threads=2, memory=31.21 GiB>
fuse = True

    @pytest.mark.parametrize("fuse", [True, False])
    def test_fused_blockwise_dataframe_merge(c, fuse):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        # Generate two DataFrames with more partitions than
        # the `max_branch` default used for shuffling (32).
        # We need a multi-stage shuffle to cover #7178 fix.
        size = 35
        df1 = pd.DataFrame({"x": range(size), "y": range(size)})
        df2 = pd.DataFrame({"x": range(size), "z": range(size)})
        ddf1 = dd.from_pandas(df1, npartitions=size) + 10
        ddf2 = dd.from_pandas(df2, npartitions=5) + 10
        df1 += 10
        df2 += 10
    
        with dask.config.set({"optimization.fuse.active": fuse}):
            ddfm = ddf1.merge(ddf2, on=["x"], how="left", shuffle_method="tasks")
>           ddfm.head()  # https://github.com/dask/dask/issues/7178

dask/tests/test_distributed.py:151: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:393: in head
    out = out.compute()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:307: in compute
    return DaskMethodsMixin.compute(out, **kwargs)
dask/base.py:379: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:665: in compute
    results = schedule(dsk, keys, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Client: 'tcp://127.0.0.1:38441' processes=2 threads=2, memory=31.21 GiB>
futures = [[<Future: cancelled, key: ('getitem-getitem-blockwisemerge-blockwisehead-dc3915d8e1ece5b878ba49a97d469835', 0)>]]
errors = 'raise', direct = False, local_worker = None

    async def _gather(self, futures, errors="raise", direct=None, local_worker=None):
        unpacked, future_set = unpack_remotedata(futures, byte_keys=True)
        mismatched_futures = [f for f in future_set if f.client is not self]
        if mismatched_futures:
            raise ValueError(
                "Cannot gather Futures created by another client. "
                f"These are the {len(mismatched_futures)} (out of {len(futures)}) "
                f"mismatched Futures and their client IDs (this client is {self.id}): "
                f"{ {f: f.client.id for f in mismatched_futures} }"  # noqa: E201, E202
            )
        keys = [future.key for future in future_set]
        bad_data = dict()
        data = {}
    
        if direct is None:
            direct = self.direct_to_workers
        if direct is None:
            try:
                w = get_worker()
            except Exception:
                direct = False
            else:
                if w.scheduler.address == self.scheduler.address:
                    direct = True
    
        async def wait(k):
            """Want to stop the All(...) early if we find an error"""
            try:
                st = self.futures[k]
            except KeyError:
                raise AllExit()
            else:
                await st.wait()
            if st.status != "finished" and errors == "raise":
                raise AllExit()
    
        while True:
            logger.debug("Waiting on futures to clear before gather")
    
            with suppress(AllExit):
                await distributed.utils.All(
                    [wait(key) for key in keys if key in self.futures],
                    quiet_exceptions=AllExit,
                )
    
            failed = ("error", "cancelled")
    
            exceptions = set()
            bad_keys = set()
            for key in keys:
                if key not in self.futures or self.futures[key].status in failed:
                    exceptions.add(key)
                    if errors == "raise":
                        try:
                            st = self.futures[key]
                            exception = st.exception
                            traceback = st.traceback
                        except (KeyError, AttributeError):
                            exc = CancelledError(key)
                        else:
>                           raise exception.with_traceback(traceback)
E                           RuntimeError: Error during deserialization of the task graph. This frequently
E                           occurs if the Scheduler and Client have different environments.
E                           For more information, see
E                           https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/client.py:2243: RuntimeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_fused_blockwise_dataframe_merge[False] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 5s]
Raw output
RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
from __future__ import annotations
    
    import asyncio
    import contextlib
    import dataclasses
    import heapq
    import inspect
    import itertools
    import json
    import logging
    import math
    import operator
    import os
    import pickle
    import random
    import textwrap
    import uuid
    import warnings
    import weakref
    from collections import defaultdict, deque
    from collections.abc import (
        Callable,
        Collection,
        Container,
        Hashable,
        Iterable,
        Iterator,
        Mapping,
        Sequence,
        Set,
    )
    from contextlib import suppress
    from functools import partial
    from typing import TYPE_CHECKING, Any, ClassVar, Literal, NamedTuple, cast, overload
    
    import psutil
    import tornado.web
    from sortedcontainers import SortedDict, SortedSet
    from tlz import (
        concat,
        first,
        groupby,
        merge,
        merge_sorted,
        merge_with,
        partition,
        pluck,
        second,
        take,
        valmap,
    )
    from tornado.ioloop import IOLoop
    
    import dask
    from dask.core import get_deps, validate_key
    from dask.typing import Key, no_default
    from dask.utils import (
        ensure_dict,
        format_bytes,
        format_time,
        key_split,
        parse_bytes,
        parse_timedelta,
        tmpfile,
    )
    from dask.widgets import get_template
    
    from distributed import cluster_dump, preloading, profile
    from distributed import versions as version_module
    from distributed._asyncio import RLock
    from distributed._stories import scheduler_story
    from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
    from distributed.batched import BatchedSend
    from distributed.client import SourceCode
    from distributed.collections import HeapSet
    from distributed.comm import (
        Comm,
        CommClosedError,
        get_address_host,
        normalize_address,
        resolve_address,
        unparse_host_port,
    )
    from distributed.comm.addressing import addresses_from_user_args
    from distributed.compatibility import PeriodicCallback
    from distributed.core import (
        ErrorMessage,
        OKMessage,
        Status,
        clean_exception,
        error_message,
        rpc,
        send_recv,
    )
    from distributed.diagnostics.memory_sampler import MemorySamplerExtension
    from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name
    from distributed.event import EventExtension
    from distributed.http import get_handlers
    from distributed.lock import LockExtension
    from distributed.metrics import monotonic, time
    from distributed.multi_lock import MultiLockExtension
    from distributed.node import ServerNode
    from distributed.proctitle import setproctitle
    from distributed.protocol import deserialize
    from distributed.protocol.pickle import dumps, loads
    from distributed.protocol.serialize import Serialized, ToPickle, serialize
    from distributed.publish import PublishExtension
    from distributed.pubsub import PubSubSchedulerExtension
    from distributed.queues import QueueExtension
    from distributed.recreate_tasks import ReplayTaskScheduler
    from distributed.security import Security
    from distributed.semaphore import SemaphoreExtension
    from distributed.shuffle import ShuffleSchedulerPlugin
    from distributed.spans import SpansSchedulerExtension
    from distributed.stealing import WorkStealing
    from distributed.utils import (
        All,
        TimeoutError,
        format_dashboard_link,
        get_fileno_limit,
        is_python_shutting_down,
        key_split_group,
        log_errors,
        offload,
        recursive_to_dict,
        wait_for,
    )
    from distributed.utils_comm import (
        gather_from_workers,
        retry_operation,
        scatter_to_workers,
        unpack_remotedata,
    )
    from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
    from distributed.variable import VariableExtension
    from distributed.worker import _normalize_task
    
    if TYPE_CHECKING:
        # TODO import from typing (requires Python >=3.10)
        from typing_extensions import TypeAlias
    
        from dask.highlevelgraph import HighLevelGraph
    
    # Not to be confused with distributed.worker_state_machine.TaskStateState
    TaskStateState: TypeAlias = Literal[
        "released",
        "waiting",
        "no-worker",
        "queued",
        "processing",
        "memory",
        "erred",
        "forgotten",
    ]
    
    ALL_TASK_STATES: Set[TaskStateState] = set(TaskStateState.__args__)  # type: ignore
    
    # {task key -> finish state}
    # Not to be confused with distributed.worker_state_machine.Recs
    Recs: TypeAlias = dict[Key, TaskStateState]
    # {client or worker address: [{op: <key>, ...}, ...]}
    Msgs: TypeAlias = dict[str, list[dict[str, Any]]]
    # (recommendations, client messages, worker messages)
    RecsMsgs: TypeAlias = tuple[Recs, Msgs, Msgs]
    
    T_runspec: TypeAlias = tuple[Callable, tuple, dict[str, Any]]
    
    logger = logging.getLogger(__name__)
    LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")
    DEFAULT_DATA_SIZE = parse_bytes(
        dask.config.get("distributed.scheduler.default-data-size")
    )
    STIMULUS_ID_UNSET = "<stimulus_id unset>"
    
    DEFAULT_EXTENSIONS = {
        "locks": LockExtension,
        "multi_locks": MultiLockExtension,
        "publish": PublishExtension,
        "replay-tasks": ReplayTaskScheduler,
        "queues": QueueExtension,
        "variables": VariableExtension,
        "pubsub": PubSubSchedulerExtension,
        "semaphores": SemaphoreExtension,
        "events": EventExtension,
        "amm": ActiveMemoryManagerExtension,
        "memory_sampler": MemorySamplerExtension,
        "shuffle": ShuffleSchedulerPlugin,
        "spans": SpansSchedulerExtension,
        "stealing": WorkStealing,
    }
    
    
    class ClientState:
        """A simple object holding information about a client."""
    
        #: A unique identifier for this client. This is generally an opaque
        #: string generated by the client itself.
        client_key: str
    
        #: Cached hash of :attr:`~ClientState.client_key`
        _hash: int
    
        #: A set of tasks this client wants to be kept in memory, so that it can download
        #: its result when desired. This is the reverse mapping of
        #: :class:`TaskState.who_wants`. Tasks are typically removed from this set when the
        #: corresponding object in the client's space (for example a ``Future`` or a Dask
        #: collection) gets garbage-collected.
        wants_what: set[TaskState]
    
        #: The last time we received a heartbeat from this client, in local scheduler time.
        last_seen: float
    
        #: Output of :func:`distributed.versions.get_versions` on the client
        versions: dict[str, Any]
    
        __slots__ = tuple(__annotations__)
    
        def __init__(self, client: str, *, versions: dict[str, Any] | None = None):
            self.client_key = client
            self._hash = hash(client)
            self.wants_what = set()
            self.last_seen = time()
            self.versions = versions or {}
    
        def __hash__(self) -> int:
            return self._hash
    
        def __eq__(self, other: object) -> bool:
            if not isinstance(other, ClientState):
                return False
            return self.client_key == other.client_key
    
        def __repr__(self) -> str:
            return f"<Client {self.client_key!r}>"
    
        def __str__(self) -> str:
            return self.client_key
    
        def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict:
            """Dictionary representation for debugging purposes.
            Not type stable and not intended for roundtrips.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            TaskState._to_dict
            """
            return recursive_to_dict(
                self,
                exclude=set(exclude) | {"versions"},  # type: ignore
                members=True,
            )
    
    
    class MemoryState:
        """Memory readings on a worker or on the whole cluster.
    
        See :doc:`worker-memory`.
    
        Attributes / properties:
    
        managed_total
            Sum of the output of sizeof() for all dask keys held by the worker in memory,
            plus number of bytes spilled to disk
    
        managed
            Sum of the output of sizeof() for the dask keys held in RAM. Note that this may
            be inaccurate, which may cause inaccurate unmanaged memory (see below).
    
        spilled
            Number of bytes  for the dask keys spilled to the hard drive.
            Note that this is the size on disk; size in memory may be different due to
            compression and inaccuracies in sizeof(). In other words, given the same keys,
            'managed' will change depending on the keys being in memory or spilled.
    
        process
            Total RSS memory measured by the OS on the worker process.
            This is always exactly equal to managed + unmanaged.
    
        unmanaged
            process - managed. This is the sum of
    
            - Python interpreter and modules
            - global variables
            - memory temporarily allocated by the dask tasks that are currently running
            - memory fragmentation
            - memory leaks
            - memory not yet garbage collected
            - memory not yet free()'d by the Python memory manager to the OS
    
        unmanaged_old
            Minimum of the 'unmanaged' measures over the last
            ``distributed.memory.recent-to-old-time`` seconds
    
        unmanaged_recent
            unmanaged - unmanaged_old; in other words process memory that has been recently
            allocated but is not accounted for by dask; hopefully it's mostly a temporary
            spike.
    
        optimistic
            managed + unmanaged_old; in other words the memory held long-term by
            the process under the hopeful assumption that all unmanaged_recent memory is a
            temporary spike
        """
    
        process: int
        unmanaged_old: int
        managed: int
        spilled: int
    
        __slots__ = tuple(__annotations__)
    
        def __init__(
            self,
            *,
            process: int,
            unmanaged_old: int,
            managed: int,
            spilled: int,
        ):
            # Some data arrives with the heartbeat, some other arrives in realtime as the
            # tasks progress. Also, sizeof() is not guaranteed to return correct results.
            # This can cause glitches where a partial measure is larger than the whole, so
            # we need to force all numbers to add up exactly by definition.
            self.process = process
            self.managed = min(self.process, managed)
            self.spilled = spilled
            # Subtractions between unsigned ints guaranteed by construction to be >= 0
            self.unmanaged_old = min(unmanaged_old, process - self.managed)
    
        @staticmethod
        def sum(*infos: MemoryState) -> MemoryState:
            process = 0
            unmanaged_old = 0
            managed = 0
            spilled = 0
            for ms in infos:
                process += ms.process
                unmanaged_old += ms.unmanaged_old
                spilled += ms.spilled
                managed += ms.managed
            return MemoryState(
                process=process,
                unmanaged_old=unmanaged_old,
                managed=managed,
                spilled=spilled,
            )
    
        @property
        def managed_total(self) -> int:
            return self.managed + self.spilled
    
        @property
        def unmanaged(self) -> int:
            # This is never negative thanks to __init__
            return self.process - self.managed
    
        @property
        def unmanaged_recent(self) -> int:
            # This is never negative thanks to __init__
            return self.process - self.managed - self.unmanaged_old
    
        @property
        def optimistic(self) -> int:
            return self.managed + self.unmanaged_old
    
        @property
        def managed_in_memory(self) -> int:
            warnings.warn("managed_in_memory has been renamed to managed", FutureWarning)
            return self.managed
    
        @property
        def managed_spilled(self) -> int:
            warnings.warn("managed_spilled has been renamed to spilled", FutureWarning)
            return self.spilled
    
        def __repr__(self) -> str:
            return (
                f"Process memory (RSS)  : {format_bytes(self.process)}\n"
                f"  - managed by Dask   : {format_bytes(self.managed)}\n"
                f"  - unmanaged (old)   : {format_bytes(self.unmanaged_old)}\n"
                f"  - unmanaged (recent): {format_bytes(self.unmanaged_recent)}\n"
                f"Spilled to disk       : {format_bytes(self.spilled)}\n"
            )
    
        def _to_dict(self, *, exclude: Container[str] = ()) -> dict:
            """Dictionary representation for debugging purposes.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            """
            return {
                k: getattr(self, k)
                for k in dir(self)
                if not k.startswith("_")
                and k not in {"sum", "managed_in_memory", "managed_spilled"}
            }
    
    
    class WorkerState:
        """A simple object holding information about a worker.
    
        Not to be confused with :class:`distributed.worker_state_machine.WorkerState`.
        """
    
        #: This worker's unique key. This can be its connected address
        #: (such as ``"tcp://127.0.0.1:8891"``) or an alias (such as ``"alice"``).
        address: str
    
        pid: int
        name: Hashable
    
        #: The number of CPU threads made available on this worker
        nthreads: int
    
        #: Memory available to the worker, in bytes
        memory_limit: int
    
        local_directory: str
        services: dict[str, int]
    
        #: Output of :meth:`distributed.versions.get_versions` on the worker
        versions: dict[str, Any]
    
        #: Address of the associated :class:`~distributed.nanny.Nanny`, if present
        nanny: str
    
        #: Read-only worker status, synced one way from the remote Worker object
        status: Status
    
        #: Cached hash of :attr:`~WorkerState.address`
        _hash: int
    
        #: The total memory size, in bytes, used by the tasks this worker holds in memory
        #: (i.e. the tasks in this worker's :attr:`~WorkerState.has_what`).
        nbytes: int
    
        #: Worker memory unknown to the worker, in bytes, which has been there for more than
        #: 30 seconds. See :class:`MemoryState`.
        _memory_unmanaged_old: int
    
        #: History of the last 30 seconds' worth of unmanaged memory. Used to differentiate
        #: between "old" and "new" unmanaged memory.
        #: Format: ``[(timestamp, bytes), (timestamp, bytes), ...]``
        _memory_unmanaged_history: deque[tuple[float, int]]
    
        metrics: dict[str, Any]
    
        #: The last time we received a heartbeat from this worker, in local scheduler time.
        last_seen: float
    
        time_delay: float
        bandwidth: float
    
        #: A set of all TaskStates on this worker that are actors. This only includes those
        #: actors whose state actually lives on this worker, not actors to which this worker
        #: has a reference.
        actors: set[TaskState]
    
        #: Underlying data of :meth:`WorkerState.has_what`
        _has_what: dict[TaskState, None]
    
        #: A set of tasks that have been submitted to this worker. Multiple tasks may be
        # submitted to a worker in advance and the worker will run them eventually,
        # depending on its execution resources (but see :doc:`work-stealing`).
        #:
        #: All the tasks here are in the "processing" state.
        #: This attribute is kept in sync with :attr:`TaskState.processing_on`.
        processing: set[TaskState]
    
        #: Running tasks that invoked :func:`distributed.secede`
        long_running: set[TaskState]
    
        #: A dictionary of tasks that are currently being run on this worker.
        #: Each task state is associated with the duration in seconds which the task has
        #: been running.
        executing: dict[TaskState, float]
    
        #: The available resources on this worker, e.g. ``{"GPU": 2}``.
        #: These are abstract quantities that constrain certain tasks from running at the
        #: same time on this worker.
        resources: dict[str, float]
    
        #: The sum of each resource used by all tasks allocated to this worker.
        #: The numbers in this dictionary can only be less or equal than those in this
        #: worker's :attr:`~WorkerState.resources`.
        used_resources: dict[str, float]
    
        #: Arbitrary additional metadata to be added to :meth:`~WorkerState.identity`
        extra: dict[str, Any]
    
        # The unique server ID this WorkerState is referencing
        server_id: str
    
        # Reference to scheduler task_groups
        scheduler_ref: weakref.ref[SchedulerState] | None
        task_prefix_count: defaultdict[str, int]
        _network_occ: float
        _occupancy_cache: float | None
    
        #: Keys that may need to be fetched to this worker, and the number of tasks that need them.
        #: All tasks are currently in `memory` on a worker other than this one.
        #: Much like `processing`, this does not exactly reflect worker state:
        #: keys here may be queued to fetch, in flight, or already in memory
        #: on the worker.
        needs_what: dict[TaskState, int]
    
        __slots__ = tuple(__annotations__)
    
        def __init__(
            self,
            *,
            address: str,
            status: Status,
            pid: int,
            name: object,
            nthreads: int = 0,
            memory_limit: int,
            local_directory: str,
            nanny: str,
            server_id: str,
            services: dict[str, int] | None = None,
            versions: dict[str, Any] | None = None,
            extra: dict[str, Any] | None = None,
            scheduler: SchedulerState | None = None,
        ):
            self.server_id = server_id
            self.address = address
            self.pid = pid
            self.name = name
            self.nthreads = nthreads
            self.memory_limit = memory_limit
            self.local_directory = local_directory
            self.services = services or {}
            self.versions = versions or {}
            self.nanny = nanny
            self.status = status
            self._hash = hash(self.server_id)
            self.nbytes = 0
            self._memory_unmanaged_old = 0
            self._memory_unmanaged_history = deque()
            self.metrics = {}
            self.last_seen = 0
            self.time_delay = 0
            self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
            self.actors = set()
            self._has_what = {}
            self.processing = set()
            self.long_running = set()
            self.executing = {}
            self.resources = {}
            self.used_resources = {}
            self.extra = extra or {}
            self.scheduler_ref = weakref.ref(scheduler) if scheduler else None
            self.task_prefix_count = defaultdict(int)
            self.needs_what = {}
            self._network_occ = 0
            self._occupancy_cache = None
    
        def __hash__(self) -> int:
            return self._hash
    
        def __eq__(self, other: object) -> bool:
            return isinstance(other, WorkerState) and other.server_id == self.server_id
    
        @property
        def has_what(self) -> Set[TaskState]:
            """An insertion-sorted set-like of tasks which currently reside on this worker.
            All the tasks here are in the "memory" state.
            This is the reverse mapping of :attr:`TaskState.who_has`.
    
            This is a read-only public accessor. The data is implemented as a dict without
            values, because rebalance() relies on dicts being insertion-sorted.
            """
            return self._has_what.keys()
    
        @property
        def host(self) -> str:
            return get_address_host(self.address)
    
        @property
        def memory(self) -> MemoryState:
            """Polished memory metrics for the worker.
    
            **Design note on managed memory**
    
            There are two measures available for managed memory:
    
            - ``self.nbytes``
            - ``self.metrics["managed_bytes"]``
    
            At rest, the two numbers must be identical. However, ``self.nbytes`` is
            immediately updated through the batched comms as soon as each task lands in
            memory on the worker; ``self.metrics["managed_bytes"]`` instead is updated by
            the heartbeat, which can lag several seconds behind.
    
            Below we are mixing likely newer managed memory info from ``self.nbytes`` with
            process and spilled memory from the heartbeat. This is deliberate, so that
            managed memory total is updated more frequently.
    
            Managed memory directly and immediately contributes to optimistic memory, which
            is in turn used in Active Memory Manager heuristics (at the moment of writing;
            more uses will likely be added in the future). So it's important to have it
            up to date; much more than it is for process memory.
    
            Having up-to-date managed memory info as soon as the scheduler learns about
            task completion also substantially simplifies unit tests.
    
            The flip side of this design is that it may cause some noise in the
            unmanaged_recent measure. e.g.:
    
            1. Delete 100MB of managed data
            2. The updated managed memory reaches the scheduler faster than the
               updated process memory
            3. There's a blip where the scheduler thinks that there's a sudden 100MB
               increase in unmanaged_recent, since process memory hasn't changed but managed
               memory has decreased by 100MB
            4. When the heartbeat arrives, process memory goes down and so does the
               unmanaged_recent.
    
            This is OK - one of the main reasons for the unmanaged_recent / unmanaged_old
            split is exactly to concentrate all the noise in unmanaged_recent and exclude it
            from optimistic memory, which is used for heuristics.
    
            Something that is less OK, but also less frequent, is that the sudden deletion
            of spilled keys will cause a negative blip in managed memory:
    
            1. Delete 100MB of spilled data
            2. The updated managed memory *total* reaches the scheduler faster than the
               updated spilled portion
            3. This causes the managed memory to temporarily plummet and be replaced by
               unmanaged_recent, while spilled memory remains unaltered
            4. When the heartbeat arrives, managed goes back up, unmanaged_recent
               goes back down, and spilled goes down by 100MB as it should have to
               begin with.
    
            :issue:`6002` will let us solve this.
            """
            return MemoryState(
                process=self.metrics["memory"],
                managed=max(0, self.nbytes - self.metrics["spilled_bytes"]["memory"]),
                spilled=self.metrics["spilled_bytes"]["disk"],
                unmanaged_old=self._memory_unmanaged_old,
            )
    
        def clean(self) -> WorkerState:
            """Return a version of this object that is appropriate for serialization"""
            ws = WorkerState(
                address=self.address,
                status=self.status,
                pid=self.pid,
                name=self.name,
                nthreads=self.nthreads,
                memory_limit=self.memory_limit,
                local_directory=self.local_directory,
                services=self.services,
                nanny=self.nanny,
                extra=self.extra,
                server_id=self.server_id,
            )
            ws._occupancy_cache = self.occupancy
    
            ws.executing = {
                ts.key: duration for ts, duration in self.executing.items()  # type: ignore
            }
            return ws
    
        def __repr__(self) -> str:
            name = f", name: {self.name}" if self.name != self.address else ""
            return (
                f"<WorkerState {self.address!r}{name}, "
                f"status: {self.status.name}, "
                f"memory: {len(self.has_what)}, "
                f"processing: {len(self.processing)}>"
            )
    
        def _repr_html_(self) -> str:
            return get_template("worker_state.html.j2").render(
                address=self.address,
                name=self.name,
                status=self.status.name,
                has_what=self.has_what,
                processing=self.processing,
            )
    
        def identity(self) -> dict[str, Any]:
            return {
                "type": "Worker",
                "id": self.name,
                "host": self.host,
                "resources": self.resources,
                "local_directory": self.local_directory,
                "name": self.name,
                "nthreads": self.nthreads,
                "memory_limit": self.memory_limit,
                "last_seen": self.last_seen,
                "services": self.services,
                "metrics": self.metrics,
                "status": self.status.name,
                "nanny": self.nanny,
                **self.extra,
            }
    
        def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict[str, Any]:
            """Dictionary representation for debugging purposes.
            Not type stable and not intended for roundtrips.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            TaskState._to_dict
            """
            return recursive_to_dict(
                self,
                exclude=set(exclude) | {"versions"},  # type: ignore
                members=True,
            )
    
        @property
        def scheduler(self) -> SchedulerState:
            assert self.scheduler_ref
            s = self.scheduler_ref()
            assert s
            return s
    
        def add_to_processing(self, ts: TaskState) -> None:
            """Assign a task to this worker for compute."""
            if self.scheduler.validate:
                assert ts not in self.processing
    
            tp = ts.prefix
            self.task_prefix_count[tp.name] += 1
            self.scheduler._task_prefix_count_global[tp.name] += 1
            self.processing.add(ts)
            for dts in ts.dependencies:
                assert dts.who_has
                if self not in dts.who_has:
                    self._inc_needs_replica(dts)
    
        def add_to_long_running(self, ts: TaskState) -> None:
            if self.scheduler.validate:
                assert ts in self.processing
                assert ts not in self.long_running
    
            self._remove_from_task_prefix_count(ts)
            # Cannot remove from processing since we're using this for things like
            # idleness detection. Idle workers are typically targeted for
            # downscaling but we should not downscale workers with long running
            # tasks
            self.long_running.add(ts)
    
        def remove_from_processing(self, ts: TaskState) -> None:
            """Remove a task from a workers processing"""
            if self.scheduler.validate:
                assert ts in self.processing
    
            if ts in self.long_running:
                self.long_running.discard(ts)
            else:
                self._remove_from_task_prefix_count(ts)
            self.processing.remove(ts)
            for dts in ts.dependencies:
                if dts in self.needs_what:
                    self._dec_needs_replica(dts)
    
        def _remove_from_task_prefix_count(self, ts: TaskState) -> None:
            count = self.task_prefix_count[ts.prefix.name] - 1
            if count:
                self.task_prefix_count[ts.prefix.name] = count
            else:
                del self.task_prefix_count[ts.prefix.name]
    
            count = self.scheduler._task_prefix_count_global[ts.prefix.name] - 1
            if count:
                self.scheduler._task_prefix_count_global[ts.prefix.name] = count
            else:
                del self.scheduler._task_prefix_count_global[ts.prefix.name]
    
        def remove_replica(self, ts: TaskState) -> None:
            """The worker no longer has a task in memory"""
            if self.scheduler.validate:
                assert ts.who_has
                assert self in ts.who_has
                assert ts in self.has_what
                assert ts not in self.needs_what
    
            self.nbytes -= ts.get_nbytes()
            del self._has_what[ts]
            ts.who_has.remove(self)  # type: ignore
            if not ts.who_has:
                ts.who_has = None
    
        def _inc_needs_replica(self, ts: TaskState) -> None:
            """Assign a task fetch to this worker and update network occupancies"""
            if self.scheduler.validate:
                assert ts.who_has
                assert self not in ts.who_has
                assert ts not in self.has_what
            if ts not in self.needs_what:
                self.needs_what[ts] = 1
                nbytes = ts.get_nbytes()
                self._network_occ += nbytes
                self.scheduler._network_occ_global += nbytes
            else:
                self.needs_what[ts] += 1
    
        def _dec_needs_replica(self, ts: TaskState) -> None:
            if self.scheduler.validate:
                assert ts in self.needs_what
    
            self.needs_what[ts] -= 1
            if self.needs_what[ts] == 0:
                del self.needs_what[ts]
                nbytes = ts.get_nbytes()
                self._network_occ -= nbytes
                self.scheduler._network_occ_global -= nbytes
    
        def add_replica(self, ts: TaskState) -> None:
            """The worker acquired a replica of task"""
            if ts.who_has is None:
                ts.who_has = set()
            if ts in self._has_what:
                return
            nbytes = ts.get_nbytes()
            if ts in self.needs_what:
                del self.needs_what[ts]
                self._network_occ -= nbytes
                self.scheduler._network_occ_global -= nbytes
            ts.who_has.add(self)
            self.nbytes += nbytes
            self._has_what[ts] = None
    …                     link = format_dashboard_link(addr, server.port)
                    # formatting dashboard link can fail if distributed.dashboard.link
                    # refers to non-existant env vars.
                    except KeyError as e:
                        logger.warning(
                            f"Failed to format dashboard link, unknown value: {e}"
                        )
                        link = f":{server.port}"
                else:
                    link = f"{listen_ip}:{server.port}"
                logger.info("%11s at:  %25s", name, link)
    
            if self.scheduler_file:
                with open(self.scheduler_file, "w") as f:
                    json.dump(self.identity(), f, indent=2)
    
                fn = self.scheduler_file  # remove file when we close the process
    
                def del_scheduler_file():
                    if os.path.exists(fn):
                        os.remove(fn)
    
                weakref.finalize(self, del_scheduler_file)
    
            await self.preloads.start()
    
            if self.jupyter:
                # Allow insecure communications from local users
                if self.address.startswith("tls://"):
                    await self.listen("tcp://localhost:0")
                os.environ["DASK_SCHEDULER_ADDRESS"] = self.listeners[-1].contact_address
    
            await asyncio.gather(
                *[plugin.start(self) for plugin in list(self.plugins.values())]
            )
    
            self.start_periodic_callbacks()
    
            setproctitle(f"dask scheduler [{self.address}]")
            return self
    
        async def close(self, fast=None, close_workers=None, reason=""):
            """Send cleanup signal to all coroutines then wait until finished
    
            See Also
            --------
            Scheduler.cleanup
            """
            if fast is not None or close_workers is not None:
                warnings.warn(
                    "The 'fast' and 'close_workers' parameters in Scheduler.close have no "
                    "effect and will be removed in a future version of distributed.",
                    FutureWarning,
                )
            if self.status in (Status.closing, Status.closed):
                await self.finished()
                return
    
            async def log_errors(func):
                try:
                    await func()
                except Exception:
                    logger.exception("Plugin call failed during scheduler.close")
    
            await asyncio.gather(
                *[log_errors(plugin.before_close) for plugin in list(self.plugins.values())]
            )
    
            self.status = Status.closing
            logger.info("Scheduler closing due to %s...", reason or "unknown reason")
            setproctitle("dask scheduler [closing]")
    
            await self.preloads.teardown()
    
            await asyncio.gather(
                *[log_errors(plugin.close) for plugin in list(self.plugins.values())]
            )
    
            for pc in self.periodic_callbacks.values():
                pc.stop()
            self.periodic_callbacks.clear()
    
            self.stop_services()
    
            for ext in self.extensions.values():
                with suppress(AttributeError):
                    ext.teardown()
            logger.info("Scheduler closing all comms")
    
            futures = []
            for _, comm in list(self.stream_comms.items()):
                # FIXME use `self.remove_worker()` instead after https://github.com/dask/distributed/issues/6390
                if not comm.closed():
                    # This closes the Worker and ensures that if a Nanny is around,
                    # it is closed as well
                    comm.send({"op": "close", "reason": "scheduler-close"})
                    comm.send({"op": "close-stream"})
                    # ^ TODO remove? `Worker.close` will close the stream anyway.
                with suppress(AttributeError):
                    futures.append(comm.close())
    
            await asyncio.gather(*futures)
    
            if self.jupyter:
                await self._jupyter_server_application._cleanup()
    
            for comm in self.client_comms.values():
                comm.abort()
    
            await self.rpc.close()
    
            self.status = Status.closed
            self.stop()
            await super().close()
    
            setproctitle("dask scheduler [closed]")
            disable_gc_diagnosis()
    
        ###########
        # Stimuli #
        ###########
    
        def heartbeat_worker(
            self,
            *,
            address: str,
            resolve_address: bool = True,
            now: float | None = None,
            resources: dict[str, float] | None = None,
            host_info: dict | None = None,
            metrics: dict,
            executing: dict[Key, float] | None = None,
            extensions: dict | None = None,
        ) -> dict[str, Any]:
            address = self.coerce_address(address, resolve_address)
            address = normalize_address(address)
            ws = self.workers.get(address)
            if ws is None:
                logger.warning(f"Received heartbeat from unregistered worker {address!r}.")
                return {"status": "missing"}
    
            host = get_address_host(address)
            local_now = time()
            host_info = host_info or {}
    
            dh: dict = self.host_info.setdefault(host, {})
            dh["last-seen"] = local_now
    
            frac = 1 / len(self.workers)
            self.bandwidth = (
                self.bandwidth * (1 - frac) + metrics["bandwidth"]["total"] * frac
            )
            for other, (bw, count) in metrics["bandwidth"]["workers"].items():
                if (address, other) not in self.bandwidth_workers:
                    self.bandwidth_workers[address, other] = bw / count
                else:
                    alpha = (1 - frac) ** count
                    self.bandwidth_workers[address, other] = self.bandwidth_workers[
                        address, other
                    ] * alpha + bw * (1 - alpha)
            for typ, (bw, count) in metrics["bandwidth"]["types"].items():
                if typ not in self.bandwidth_types:
                    self.bandwidth_types[typ] = bw / count
                else:
                    alpha = (1 - frac) ** count
                    self.bandwidth_types[typ] = self.bandwidth_types[typ] * alpha + bw * (
                        1 - alpha
                    )
    
            ws.last_seen = local_now
            if executing is not None:
                # NOTE: the executing dict is unused
                ws.executing = {}
                for key, duration in executing.items():
                    if key in self.tasks:
                        ts = self.tasks[key]
                        ws.executing[ts] = duration
                        ts.prefix.add_exec_time(duration)
    
            for name, value in metrics["digests_total_since_heartbeat"].items():
                self.cumulative_worker_metrics[name] += value
    
            ws.metrics = metrics
    
            # Calculate RSS - dask keys, separating "old" and "new" usage
            # See MemoryState for details
            max_memory_unmanaged_old_hist_age = local_now - self.MEMORY_RECENT_TO_OLD_TIME
            memory_unmanaged_old = ws._memory_unmanaged_old
            while ws._memory_unmanaged_history:
                timestamp, size = ws._memory_unmanaged_history[0]
                if timestamp >= max_memory_unmanaged_old_hist_age:
                    break
                ws._memory_unmanaged_history.popleft()
                if size == memory_unmanaged_old:
                    memory_unmanaged_old = 0  # recalculate min()
    
            # ws._nbytes is updated at a different time and sizeof() may not be accurate,
            # so size may be (temporarily) negative; floor it to zero.
            size = max(
                0, metrics["memory"] - ws.nbytes + metrics["spilled_bytes"]["memory"]
            )
    
            ws._memory_unmanaged_history.append((local_now, size))
            if not memory_unmanaged_old:
                # The worker has just been started or the previous minimum has been expunged
                # because too old.
                # Note: this algorithm is capped to 200 * MEMORY_RECENT_TO_OLD_TIME elements
                # cluster-wide by heartbeat_interval(), regardless of the number of workers
                ws._memory_unmanaged_old = min(map(second, ws._memory_unmanaged_history))
            elif size < memory_unmanaged_old:
                ws._memory_unmanaged_old = size
    
            if host_info:
                dh = self.host_info.setdefault(host, {})
                dh.update(host_info)
    
            if now:
                ws.time_delay = local_now - now
    
            if resources:
                self.add_resources(worker=address, resources=resources)
    
            if extensions:
                for name, data in extensions.items():
                    self.extensions[name].heartbeat(ws, data)
    
            return {
                "status": "OK",
                "time": local_now,
                "heartbeat-interval": heartbeat_interval(len(self.workers)),
            }
    
        @log_errors
        async def add_worker(
            self,
            comm: Comm,
            *,
            address: str,
            status: str,
            server_id: str,
            nthreads: int,
            name: str,
            resolve_address: bool = True,
            now: float,
            resources: dict[str, float],
            # FIXME: This is never submitted by the worker
            host_info: None = None,
            memory_limit: int | None,
            metrics: dict[str, Any],
            pid: int = 0,
            services: dict[str, int],
            local_directory: str,
            versions: dict[str, Any],
            nanny: str,
            extra: dict,
            stimulus_id: str,
        ) -> None:
            """Add a new worker to the cluster"""
            address = self.coerce_address(address, resolve_address)
            address = normalize_address(address)
            host = get_address_host(address)
    
            if address in self.workers:
                raise ValueError("Worker already exists %s" % address)
    
            if name in self.aliases:
                logger.warning("Worker tried to connect with a duplicate name: %s", name)
                msg = {
                    "status": "error",
                    "message": "name taken, %s" % name,
                    "time": time(),
                }
                await comm.write(msg)
                return
    
            self.log_event(address, {"action": "add-worker"})
            self.log_event("all", {"action": "add-worker", "worker": address})
    
            self.workers[address] = ws = WorkerState(
                address=address,
                status=Status.lookup[status],  # type: ignore
                pid=pid,
                nthreads=nthreads,
                memory_limit=memory_limit or 0,
                name=name,
                local_directory=local_directory,
                services=services,
                versions=versions,
                nanny=nanny,
                extra=extra,
                server_id=server_id,
                scheduler=self,
            )
            if ws.status == Status.running:
                self.running.add(ws)
    
            dh = self.host_info.get(host)
            if dh is None:
                self.host_info[host] = dh = {}
    
            dh_addresses = dh.get("addresses")
            if dh_addresses is None:
                dh["addresses"] = dh_addresses = set()
                dh["nthreads"] = 0
    
            dh_addresses.add(address)
            dh["nthreads"] += nthreads
    
            self.total_nthreads += nthreads
            self.total_nthreads_history.append((time(), self.total_nthreads))
            self.aliases[name] = address
    
            self.heartbeat_worker(
                address=address,
                resolve_address=resolve_address,
                now=now,
                resources=resources,
                host_info=host_info,
                metrics=metrics,
            )
    
            # Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot
            # exist before this.
            self.check_idle_saturated(ws)
    
            self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)
    
            awaitables = []
            for plugin in list(self.plugins.values()):
                try:
                    result = plugin.add_worker(scheduler=self, worker=address)
                    if result is not None and inspect.isawaitable(result):
                        awaitables.append(result)
                except Exception as e:
                    logger.exception(e)
    
            plugin_msgs = await asyncio.gather(*awaitables, return_exceptions=True)
            plugins_exceptions = [msg for msg in plugin_msgs if isinstance(msg, Exception)]
            for exc in plugins_exceptions:
                logger.exception(exc, exc_info=exc)
    
            if ws.status == Status.running:
                self.transitions(
                    self.bulk_schedule_unrunnable_after_adding_worker(ws), stimulus_id
                )
                self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
    
            logger.info("Register worker %s", ws)
    
            msg = {
                "status": "OK",
                "time": time(),
                "heartbeat-interval": heartbeat_interval(len(self.workers)),
                "worker-plugins": self.worker_plugins,
            }
    
            version_warning = version_module.error_message(
                version_module.get_versions(),
                {w: ws.versions for w, ws in self.workers.items()},
                versions,
                source_name=str(ws.server_id),
            )
            msg.update(version_warning)
    
            await comm.write(msg)
            # This will keep running until the worker is removed
            await self.handle_worker(comm, address)
    
        async def add_nanny(self, comm: Comm, address: str) -> None:
            async with self._starting_nannies_cond:
                self._starting_nannies.add(address)
            try:
                msg = {
                    "status": "OK",
                    "nanny-plugins": self.nanny_plugins,
                }
                await comm.write(msg)
                await comm.read()
            finally:
                async with self._starting_nannies_cond:
                    self._starting_nannies.discard(address)
                    self._starting_nannies_cond.notify_all()
    
        def _match_graph_with_tasks(
            self,
            dsk: dict[Key, T_runspec],
            dependencies: dict[Key, set[Key]],
            keys: set[Key],
        ) -> set[Key]:
            n = 0
            lost_keys = set()
            while len(dsk) != n:  # walk through new tasks, cancel any bad deps
                n = len(dsk)
                for k, deps in list(dependencies.items()):
                    if any(
                        dep not in self.tasks and dep not in dsk for dep in deps
                    ):  # bad key
                        lost_keys.add(k)
                        logger.info("User asked for computation on lost data, %s", k)
                        del dsk[k]
                        del dependencies[k]
                        if k in keys:
                            keys.remove(k)
                    del deps
            # Avoid computation that is already finished
            done = set()  # tasks that are already done
            for k, v in dependencies.items():
                if v and k in self.tasks:
                    ts = self.tasks[k]
                    if ts.state in ("memory", "erred"):
                        done.add(k)
    
            if done:
                dependents = dask.core.reverse_dict(dependencies)
                stack = list(done)
                while stack:  # remove unnecessary dependencies
                    key = stack.pop()
                    try:
                        deps = dependencies[key]
                    except KeyError:
                        deps = {ts.key for ts in self.tasks[key].dependencies}
                    for dep in deps:
                        if dep in dependents:
                            child_deps = dependents[dep]
                        elif dep in self.tasks:
                            child_deps = {ts.key for ts in self.tasks[key].dependencies}
                        else:
                            child_deps = set()
                        if all(d in done for d in child_deps):
                            if dep in self.tasks and dep not in done:
                                done.add(dep)
                                stack.append(dep)
            for anc in done:
                dsk.pop(anc, None)
                dependencies.pop(anc, None)
            return lost_keys
    
        def _create_taskstate_from_graph(
            self,
            *,
            start: float,
            dsk: dict[Key, T_runspec],
            dependencies: dict,
            keys: set[Key],
            ordered: dict[Key, int],
            client: str,
            annotations_by_type: dict,
            global_annotations: dict | None,
            stimulus_id: str,
            submitting_task: Key | None,
            user_priority: int | dict[Key, int] = 0,
            actors: bool | list[Key] | None = None,
            fifo_timeout: float = 0.0,
            code: tuple[SourceCode, ...] = (),
        ) -> None:
            """
            Take a low level graph and create the necessary scheduler state to
            compute it.
    
            WARNING
            -------
            This method must not be made async since nothing here is concurrency
            safe. All interactions with TaskState objects here should be happening
            in the same event loop tick.
            """
    
            lost_keys = self._match_graph_with_tasks(dsk, dependencies, keys)
    
            if len(dsk) > 1:
                self.log_event(
                    ["all", client], {"action": "update_graph", "count": len(dsk)}
                )
    
            if lost_keys:
                self.report({"op": "cancelled-keys", "keys": lost_keys}, client=client)
                self.client_releases_keys(
                    keys=lost_keys, client=client, stimulus_id=stimulus_id
                )
    
            if not self.is_idle and self.computations:
                # Still working on something. Assign new tasks to same computation
                computation = self.computations[-1]
            else:
                computation = Computation()
                self.computations.append(computation)
    
            if code:  # add new code blocks
                computation.code.add(code)
            if global_annotations:
                # FIXME: This is kind of inconsistent since it only includes global
                # annotations.
                computation.annotations.update(global_annotations)
            del global_annotations
    
            runnable, touched_tasks, new_tasks = self._generate_taskstates(
                keys=keys,
                dsk=dsk,
                dependencies=dependencies,
                computation=computation,
            )
    
            keys_with_annotations = self._apply_annotations(
                tasks=new_tasks,
                annotations_by_type=annotations_by_type,
            )
    
            self._set_priorities(
                internal_priority=ordered,
                submitting_task=submitting_task,
                user_priority=user_priority,
                fifo_timeout=fifo_timeout,
                start=start,
                tasks=runnable,
            )
    
            self.client_desires_keys(keys=keys, client=client)
    
            # Add actors
            if actors is True:
                actors = list(keys)
            for actor in actors or []:
                ts = self.tasks[actor]
                ts.actor = True
    
            # Compute recommendations
            recommendations: Recs = {}
            priority = dict()
            for ts in sorted(
                runnable,
                key=operator.attrgetter("priority"),
                reverse=True,
            ):
                assert ts.priority  # mypy
                priority[ts.key] = ts.priority
                assert ts.run_spec
                if ts.state == "released":
                    recommendations[ts.key] = "waiting"
    
            for ts in runnable:
                for dts in ts.dependencies:
                    if dts.exception_blame:
                        ts.exception_blame = dts.exception_blame
                        recommendations[ts.key] = "erred"
                        break
    
            annotations_for_plugin: defaultdict[str, dict[Key, Any]] = defaultdict(dict)
            for key in keys_with_annotations:
                ts = self.tasks[key]
                if ts.annotations:
                    for annot, value in ts.annotations.items():
                        annotations_for_plugin[annot][key] = value
    
            spans_ext: SpansSchedulerExtension | None = self.extensions.get("spans")
            if spans_ext:
                # new_tasks does not necessarily contain all runnable tasks;
                # _generate_taskstates is not the only thing that calls new_task(). A
                # TaskState may have also been created by client_desires_keys or scatter,
                # and only later gained a run_spec.
                span_annotations = spans_ext.observe_tasks(runnable, code=code)
                # In case of TaskGroup collision, spans may have changed
                # FIXME: Is this used anywhere besides tests?
                if span_annotations:
                    annotations_for_plugin["span"] = span_annotations
                else:
                    annotations_for_plugin.pop("span", None)
    
            for plugin in list(self.plugins.values()):
                try:
                    plugin.update_graph(
                        self,
                        client=client,
                        tasks=[ts.key for ts in touched_tasks],
                        keys=keys,
                        dependencies=dependencies,
                        annotations=dict(annotations_for_plugin),
                        priority=priority,
                    )
                except Exception as e:
                    logger.exception(e)
    
            self.transitions(recommendations, stimulus_id)
    
            for ts in touched_tasks:
                if ts.state in ("memory", "erred"):
                    self.report_on_key(ts=ts, client=client)
    
        @log_errors
        async def update_graph(
            self,
            client: str,
            graph_header: dict,
            graph_frames: list[bytes],
            keys: set[Key],
            internal_priority: dict[Key, int] | None,
            submitting_task: Key | None,
            user_priority: int | dict[Key, int] = 0,
            actors: bool | list[Key] | None = None,
            fifo_timeout: float = 0.0,
            code: tuple[SourceCode, ...] = (),
            annotations: dict | None = None,
            stimulus_id: str | None = None,
        ) -> None:
            # FIXME: Apparently empty dicts arrive as a ToPickle object
            if isinstance(annotations, ToPickle):
                annotations = annotations.data  # type: ignore[unreachable]
            start = time()
            try:
                try:
>                   graph = deserialize(graph_header, graph_frames).data

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/scheduler.py:4671: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/serialize.py:439: in deserialize
    return loads(header, frames)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/serialize.py:101: in pickle_loads
    return pickle.loads(x, buffers=buffers)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/pickle.py:94: in loads
    return pickle.loads(x, buffers=buffers)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/__init__.py:1: in <module>
    from dask_expr import _version, datasets
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/datasets.py:7: in <module>
    from dask_expr._collection import new_collection
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:11: in <module>
    import dask.dataframe.methods as methods
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    from __future__ import annotations
    
    
    def _dask_expr_enabled() -> bool:
        import dask
    
        use_dask_expr = dask.config.get("dataframe.query-planning")
        if use_dask_expr:
            try:
                import dask_expr  # noqa: F401
            except ImportError:
                raise ValueError("Must install dask-expr to activate query planning.")
        return use_dask_expr
    
    
    if _dask_expr_enabled():
>       from dask_expr import (
            DataFrame,
            Index,
            Series,
            concat,
            from_array,
            from_dask_array,
            from_dask_dataframe,
            from_delayed,
            from_dict,
            from_graph,
            from_map,
            from_pandas,
            get_dummies,
            isna,
            map_overlap,
            map_partitions,
            merge,
            pivot_table,
            read_csv,
            read_hdf,
            read_json,
            read_orc,
            read_parquet,
            read_sql,
            read_sql_query,
            read_sql_table,
            read_table,
            repartition,
            to_bag,
            to_csv,
            to_datetime,
            to_hdf,
            to_json,
            to_numeric,
            to_orc,
            to_parquet,
            to_records,
            to_sql,
            to_timedelta,
        )
E       ImportError: cannot import name 'DataFrame' from partially initialized module 'dask_expr' (most likely due to a circular import) (/home/runner/miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/__init__.py)

dask/dataframe/__init__.py:17: ImportError

The above exception was the direct cause of the following exception:

c = <Client: 'tcp://127.0.0.1:43189' processes=2 threads=2, memory=31.21 GiB>
fuse = False

    @pytest.mark.parametrize("fuse", [True, False])
    def test_fused_blockwise_dataframe_merge(c, fuse):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        # Generate two DataFrames with more partitions than
        # the `max_branch` default used for shuffling (32).
        # We need a multi-stage shuffle to cover #7178 fix.
        size = 35
        df1 = pd.DataFrame({"x": range(size), "y": range(size)})
        df2 = pd.DataFrame({"x": range(size), "z": range(size)})
        ddf1 = dd.from_pandas(df1, npartitions=size) + 10
        ddf2 = dd.from_pandas(df2, npartitions=5) + 10
        df1 += 10
        df2 += 10
    
        with dask.config.set({"optimization.fuse.active": fuse}):
            ddfm = ddf1.merge(ddf2, on=["x"], how="left", shuffle_method="tasks")
>           ddfm.head()  # https://github.com/dask/dask/issues/7178

dask/tests/test_distributed.py:151: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:393: in head
    out = out.compute()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:307: in compute
    return DaskMethodsMixin.compute(out, **kwargs)
dask/base.py:379: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:665: in compute
    results = schedule(dsk, keys, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Client: 'tcp://127.0.0.1:43189' processes=2 threads=2, memory=31.21 GiB>
futures = [[<Future: cancelled, key: ('getitem-getitem-blockwisemerge-blockwisehead-dc3915d8e1ece5b878ba49a97d469835', 0)>]]
errors = 'raise', direct = False, local_worker = None

    async def _gather(self, futures, errors="raise", direct=None, local_worker=None):
        unpacked, future_set = unpack_remotedata(futures, byte_keys=True)
        mismatched_futures = [f for f in future_set if f.client is not self]
        if mismatched_futures:
            raise ValueError(
                "Cannot gather Futures created by another client. "
                f"These are the {len(mismatched_futures)} (out of {len(futures)}) "
                f"mismatched Futures and their client IDs (this client is {self.id}): "
                f"{ {f: f.client.id for f in mismatched_futures} }"  # noqa: E201, E202
            )
        keys = [future.key for future in future_set]
        bad_data = dict()
        data = {}
    
        if direct is None:
            direct = self.direct_to_workers
        if direct is None:
            try:
                w = get_worker()
            except Exception:
                direct = False
            else:
                if w.scheduler.address == self.scheduler.address:
                    direct = True
    
        async def wait(k):
            """Want to stop the All(...) early if we find an error"""
            try:
                st = self.futures[k]
            except KeyError:
                raise AllExit()
            else:
                await st.wait()
            if st.status != "finished" and errors == "raise":
                raise AllExit()
    
        while True:
            logger.debug("Waiting on futures to clear before gather")
    
            with suppress(AllExit):
                await distributed.utils.All(
                    [wait(key) for key in keys if key in self.futures],
                    quiet_exceptions=AllExit,
                )
    
            failed = ("error", "cancelled")
    
            exceptions = set()
            bad_keys = set()
            for key in keys:
                if key not in self.futures or self.futures[key].status in failed:
                    exceptions.add(key)
                    if errors == "raise":
                        try:
                            st = self.futures[key]
                            exception = st.exception
                            traceback = st.traceback
                        except (KeyError, AttributeError):
                            exc = CancelledError(key)
                        else:
>                           raise exception.with_traceback(traceback)
E                           RuntimeError: Error during deserialization of the task graph. This frequently
E                           occurs if the Scheduler and Client have different environments.
E                           For more information, see
E                           https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/client.py:2243: RuntimeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_dataframe_broadcast_merge[False-a] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 5m 0s]
Raw output
Failed: Timeout >300.0s
c = <Client: 'tcp://127.0.0.1:39973' processes=0 threads=0, memory=0 B>
on = 'a', broadcast = False

    @pytest.mark.parametrize("on", ["a", ["a"]])
    @pytest.mark.parametrize("broadcast", [True, False])
    def test_dataframe_broadcast_merge(c, on, broadcast):
        # See: https://github.com/dask/dask/issues/9870
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        pdfl = pd.DataFrame({"a": [1, 2] * 2, "b_left": range(4)})
        pdfr = pd.DataFrame({"a": [2, 1], "b_right": range(2)})
        dfl = dd.from_pandas(pdfl, npartitions=4)
        dfr = dd.from_pandas(pdfr, npartitions=2)
    
        ddfm = dd.merge(dfl, dfr, on=on, broadcast=broadcast, shuffle_method="tasks")
>       dfm = ddfm.compute()

dask/tests/test_distributed.py:173: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:307: in compute
    return DaskMethodsMixin.compute(out, **kwargs)
dask/base.py:379: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:665: in compute
    results = schedule(dsk, keys, **kwargs)
../../../miniconda3/envs/test-environment/lib/python3.12/threading.py:655: in wait
    signaled = self._cond.wait(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.lock object at 0x7f8c90c16600>, 0)>
timeout = 10

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
E                   Failed: Timeout >300.0s

../../../miniconda3/envs/test-environment/lib/python3.12/threading.py:359: Failed

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_dataframe_broadcast_merge[False-on1] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 5m 0s]
Raw output
Failed: Timeout >300.0s
c = <Client: 'tcp://127.0.0.1:34813' processes=0 threads=0, memory=0 B>
on = ['a'], broadcast = False

    @pytest.mark.parametrize("on", ["a", ["a"]])
    @pytest.mark.parametrize("broadcast", [True, False])
    def test_dataframe_broadcast_merge(c, on, broadcast):
        # See: https://github.com/dask/dask/issues/9870
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        pdfl = pd.DataFrame({"a": [1, 2] * 2, "b_left": range(4)})
        pdfr = pd.DataFrame({"a": [2, 1], "b_right": range(2)})
        dfl = dd.from_pandas(pdfl, npartitions=4)
        dfr = dd.from_pandas(pdfr, npartitions=2)
    
        ddfm = dd.merge(dfl, dfr, on=on, broadcast=broadcast, shuffle_method="tasks")
>       dfm = ddfm.compute()

dask/tests/test_distributed.py:173: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:307: in compute
    return DaskMethodsMixin.compute(out, **kwargs)
dask/base.py:379: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:665: in compute
    results = schedule(dsk, keys, **kwargs)
../../../miniconda3/envs/test-environment/lib/python3.12/threading.py:655: in wait
    signaled = self._cond.wait(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.lock object at 0x7f9a2687f540>, 0)>
timeout = 10

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
E                   Failed: Timeout >300.0s

../../../miniconda3/envs/test-environment/lib/python3.12/threading.py:359: Failed

Check warning on line 0 in dask.tests.test_graph_manipulation

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_wait_on_collections (dask.tests.test_graph_manipulation)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AssertionError: {'rename': {'mappartitions-ae5e23eefd1870527f0ff5edabda994c': 'wait_on-75f78187012e15b975ebd0b60b273b27'}}
@pytest.mark.skipif("not da or not dd")
    def test_wait_on_collections():
        colls, cnt = collections_with_node_counters()
    
        # Create a delayed that depends on a single one among all collections
        @delayed
        def f(x):
            pass
    
>       colls2 = wait_on(*colls)

dask/tests/test_graph_manipulation.py:168: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/graph_manipulation.py:533: in wait_on
    out = repack([block_one(coll) for coll in unpacked])
dask/graph_manipulation.py:530: in block_one
    return rebuild(dsk, *args, rename=rename)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:2894: in from_graph
    return new_collection(FromGraph(*args, **kwargs))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[RecursionError('maximum recursion depth exceeded') raised in repr()] FromGraph object at 0x7faaa4eebc20>
args = (HighLevelGraph with 30 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7faaa4eeb380>
 0. 140369266590112
 1. ...75ebd0b60b273b27
, Empty DataFrame
Columns: [x]
Index: [], (0, 5, 9), 'mappartitions-ae5e23eefd1870527f0ff5edabda994c')
kwargs = {'rename': {'mappartitions-ae5e23eefd1870527f0ff5edabda994c': 'wait_on-75f78187012e15b975ebd0b60b273b27'}}
operands = [HighLevelGraph with 30 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7faaa4eeb380>
 0. 140369266590112
 1. ...75ebd0b60b273b27
, Empty DataFrame
Columns: [x]
Index: [], (0, 5, 9), 'mappartitions-ae5e23eefd1870527f0ff5edabda994c']

    def __init__(self, *args, **kwargs):
        operands = list(args)
        for parameter in type(self)._parameters[len(operands) :]:
            try:
                operands.append(kwargs.pop(parameter))
            except KeyError:
                operands.append(type(self)._defaults[parameter])
>       assert not kwargs, kwargs
E       AssertionError: {'rename': {'mappartitions-ae5e23eefd1870527f0ff5edabda994c': 'wait_on-75f78187012e15b975ebd0b60b273b27'}}

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:39: AssertionError

Check warning on line 0 in dask.tests.test_graph_manipulation

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_bind_clone_collections[bind] (dask.tests.test_graph_manipulation)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AttributeError: 'DataFrame' object has no attribute '__dask_layers__'. Did you mean: '__dask_keys__'?
self = MapPartitions(lambda), key = '__dask_layers__'

    def __getattr__(self, key):
        try:
>           return object.__getattribute__(self, key)
E           AttributeError: 'MapPartitions' object has no attribute '__dask_layers__'. Did you mean: '__dask_keys__'?

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_expr.py:80: AttributeError

During handling of the above exception, another exception occurred:

self = <dask_expr.expr.DataFrame: expr=MapPartitions(lambda)>
key = '__dask_layers__'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
>               val = getattr(self.expr, key)

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:354: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = MapPartitions(lambda), key = '__dask_layers__'

    def __getattr__(self, key):
        try:
            return object.__getattribute__(self, key)
        except AttributeError as err:
            if key == "_meta":
                # Avoid a recursive loop if/when `self._meta`
                # produces an `AttributeError`
                raise RuntimeError(
                    f"Failed to generate metadata for {self}. "
                    "This operation may not be supported by the current backend."
                )
    
            # Allow operands to be accessed as attributes
            # as long as the keys are not already reserved
            # by existing methods/properties
            _parameters = type(self)._parameters
            if key in _parameters:
                idx = _parameters.index(key)
                return self.operands[idx]
            if is_dataframe_like(self._meta) and key in self._meta.columns:
                return self[key]
    
            link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
>           raise AttributeError(
                f"{err}\n\n"
                "This often means that you are attempting to use an unsupported "
                f"API function. Current API coverage is documented here: {link}."
            )
E           AttributeError: 'MapPartitions' object has no attribute '__dask_layers__'
E           
E           This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.. Did you mean: '__dask_keys__'?

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_expr.py:101: AttributeError

During handling of the above exception, another exception occurred:

func = <function bind at 0x7faaae4fa480>

    @pytest.mark.skipif("not da or not dd")
    @pytest.mark.parametrize("func", [bind, clone])
    def test_bind_clone_collections(func):
        @delayed
        def double(x):
            return x * 2
    
        # dask.delayed
        d1 = double(2)
        d2 = double(d1)
        # dask.array
        a1 = da.ones((10, 10), chunks=5)
        a2 = a1 + 1
        a3 = a2.T
        # dask.bag
        b1 = db.from_sequence([1, 2], npartitions=2)
        # b1's tasks are not callable, so we need an extra step to properly test bind
        b2 = b1.map(lambda x: x * 2)
        b3 = b2.map(lambda x: x + 1)
        b4 = b3.min()
        # dask.dataframe
        df = pd.DataFrame({"x": list(range(10))})
        ddf1 = dd.from_pandas(df, npartitions=2)
        # ddf1's tasks are not callable, so we need an extra step to properly test bind
        ddf2 = ddf1.map_partitions(lambda x: x * 2)
        ddf3 = ddf2.map_partitions(lambda x: x + 1)
        ddf4 = ddf3["x"]  # dd.Series
        ddf5 = ddf4.min()  # dd.Scalar
    
        cnt = NodeCounter()
        if func is bind:
            parent = da.ones((10, 10), chunks=5).map_blocks(cnt.f)
            cnt.n = 0
>           d2c, a3c, b3c, b4c, ddf3c, ddf4c, ddf5c = bind(
                children=(d2, a3, b3, b4, ddf3, ddf4, ddf5),
                parents=parent,
                omit=(d1, a1, b2, ddf2),
                seed=0,
            )

dask/tests/test_graph_manipulation.py:364: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/graph_manipulation.py:293: in bind
    omit_layers = {layer for coll in omit for layer in coll.__dask_layers__()}
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:1770: in __getattr__
    return super().__getattr__(key)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:360: in __getattr__
    raise err
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dask_expr.expr.DataFrame: expr=MapPartitions(lambda)>
key = '__dask_layers__'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
>           return object.__getattribute__(self, key)
E           AttributeError: 'DataFrame' object has no attribute '__dask_layers__'. Did you mean: '__dask_keys__'?

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:349: AttributeError

Check warning on line 0 in dask.tests.test_graph_manipulation

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_bind_clone_collections[clone] (dask.tests.test_graph_manipulation)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 0s]
Raw output
AttributeError: 'DataFrame' object has no attribute '__dask_layers__'. Did you mean: '__dask_keys__'?
self = MapPartitions(lambda), key = '__dask_layers__'

    def __getattr__(self, key):
        try:
>           return object.__getattribute__(self, key)
E           AttributeError: 'MapPartitions' object has no attribute '__dask_layers__'. Did you mean: '__dask_keys__'?

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_expr.py:80: AttributeError

During handling of the above exception, another exception occurred:

self = <dask_expr.expr.DataFrame: expr=MapPartitions(lambda)>
key = '__dask_layers__'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
>               val = getattr(self.expr, key)

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:354: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = MapPartitions(lambda), key = '__dask_layers__'

    def __getattr__(self, key):
        try:
            return object.__getattribute__(self, key)
        except AttributeError as err:
            if key == "_meta":
                # Avoid a recursive loop if/when `self._meta`
                # produces an `AttributeError`
                raise RuntimeError(
                    f"Failed to generate metadata for {self}. "
                    "This operation may not be supported by the current backend."
                )
    
            # Allow operands to be accessed as attributes
            # as long as the keys are not already reserved
            # by existing methods/properties
            _parameters = type(self)._parameters
            if key in _parameters:
                idx = _parameters.index(key)
                return self.operands[idx]
            if is_dataframe_like(self._meta) and key in self._meta.columns:
                return self[key]
    
            link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
>           raise AttributeError(
                f"{err}\n\n"
                "This often means that you are attempting to use an unsupported "
                f"API function. Current API coverage is documented here: {link}."
            )
E           AttributeError: 'MapPartitions' object has no attribute '__dask_layers__'
E           
E           This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.. Did you mean: '__dask_keys__'?

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_expr.py:101: AttributeError

During handling of the above exception, another exception occurred:

func = <function clone at 0x7faaae4fa5c0>

    @pytest.mark.skipif("not da or not dd")
    @pytest.mark.parametrize("func", [bind, clone])
    def test_bind_clone_collections(func):
        @delayed
        def double(x):
            return x * 2
    
        # dask.delayed
        d1 = double(2)
        d2 = double(d1)
        # dask.array
        a1 = da.ones((10, 10), chunks=5)
        a2 = a1 + 1
        a3 = a2.T
        # dask.bag
        b1 = db.from_sequence([1, 2], npartitions=2)
        # b1's tasks are not callable, so we need an extra step to properly test bind
        b2 = b1.map(lambda x: x * 2)
        b3 = b2.map(lambda x: x + 1)
        b4 = b3.min()
        # dask.dataframe
        df = pd.DataFrame({"x": list(range(10))})
        ddf1 = dd.from_pandas(df, npartitions=2)
        # ddf1's tasks are not callable, so we need an extra step to properly test bind
        ddf2 = ddf1.map_partitions(lambda x: x * 2)
        ddf3 = ddf2.map_partitions(lambda x: x + 1)
        ddf4 = ddf3["x"]  # dd.Series
        ddf5 = ddf4.min()  # dd.Scalar
    
        cnt = NodeCounter()
        if func is bind:
            parent = da.ones((10, 10), chunks=5).map_blocks(cnt.f)
            cnt.n = 0
            d2c, a3c, b3c, b4c, ddf3c, ddf4c, ddf5c = bind(
                children=(d2, a3, b3, b4, ddf3, ddf4, ddf5),
                parents=parent,
                omit=(d1, a1, b2, ddf2),
                seed=0,
            )
        else:
>           d2c, a3c, b3c, b4c, ddf3c, ddf4c, ddf5c = clone(
                d2,
                a3,
                b3,
                b4,
                ddf3,
                ddf4,
                ddf5,
                omit=(d1, a1, b2, ddf2),
                seed=0,
            )

dask/tests/test_graph_manipulation.py:371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/graph_manipulation.py:466: in clone
    out = bind(
dask/graph_manipulation.py:293: in bind
    omit_layers = {layer for coll in omit for layer in coll.__dask_layers__()}
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:1770: in __getattr__
    return super().__getattr__(key)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:360: in __getattr__
    raise err
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dask_expr.expr.DataFrame: expr=MapPartitions(lambda)>
key = '__dask_layers__'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
>           return object.__getattribute__(self, key)
E           AttributeError: 'DataFrame' object has no attribute '__dask_layers__'. Did you mean: '__dask_keys__'?

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:349: AttributeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_blockwise_dataframe_io[True-True-csv] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 5s]
Raw output
TypeError: 'module' object is not callable
c = <Client: 'tcp://127.0.0.1:41127' processes=2 threads=2, memory=31.21 GiB>
tmpdir = local('/tmp/pytest-of-runner/pytest-0/popen-gw0/test_blockwise_dataframe_io_Tr1')
io = 'csv', fuse = True, from_futures = True

    @ignore_sync_scheduler_warning
    @pytest.mark.parametrize(
        "io",
        [
            "parquet-pyarrow",
            pytest.param(
                "parquet-fastparquet", marks=pytest.mark.skip_with_pyarrow_strings
            ),
            "csv",
            # See https://github.com/dask/dask/issues/9793
            pytest.param("hdf", marks=pytest.mark.flaky(reruns=5)),
        ],
    )
    @pytest.mark.parametrize("fuse", [True, False, None])
    @pytest.mark.parametrize("from_futures", [True, False])
    def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_futures):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        df = pd.DataFrame({"x": [1, 2, 3] * 5, "y": range(15)})
    
        if from_futures:
            parts = [df.iloc[:5], df.iloc[5:10], df.iloc[10:15]]
            futs = c.scatter(parts)
            ddf0 = dd.from_delayed(futs, meta=parts[0])
        else:
            ddf0 = dd.from_pandas(df, npartitions=3)
    
        if io == "parquet-pyarrow":
            pytest.importorskip("pyarrow")
            ddf0.to_parquet(str(tmpdir))
            ddf = dd.read_parquet(str(tmpdir))
        elif io == "parquet-fastparquet":
            pytest.importorskip("fastparquet")
            with pytest.warns(FutureWarning):
                ddf0.to_parquet(str(tmpdir), engine="fastparquet")
                ddf = dd.read_parquet(str(tmpdir), engine="fastparquet")
        elif io == "csv":
            ddf0.to_csv(str(tmpdir), index=False)
            ddf = dd.read_csv(os.path.join(str(tmpdir), "*"))
        elif io == "hdf":
            pytest.importorskip("tables")
            fn = str(tmpdir.join("h5"))
            ddf0.to_hdf(fn, "/data*")
            ddf = dd.read_hdf(fn, "/data*")
        else:
            raise AssertionError("unreachable")
    
        df = df[["x"]] + 10
        ddf = ddf[["x"]] + 10
        with dask.config.set({"optimization.fuse.active": fuse}):
            ddf.compute()
>           dsk = dask.dataframe.optimize(ddf.dask, ddf.__dask_keys__())
E           TypeError: 'module' object is not callable

dask/tests/test_distributed.py:509: TypeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_blockwise_dataframe_io[True-True-hdf] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 54s]
Raw output
TypeError: 'module' object is not callable
c = <Client: 'tcp://127.0.0.1:40655' processes=2 threads=2, memory=31.21 GiB>
tmpdir = local('/tmp/pytest-of-runner/pytest-0/popen-gw2/test_blockwise_dataframe_io_Tr5')
io = 'hdf', fuse = True, from_futures = True

    @ignore_sync_scheduler_warning
    @pytest.mark.parametrize(
        "io",
        [
            "parquet-pyarrow",
            pytest.param(
                "parquet-fastparquet", marks=pytest.mark.skip_with_pyarrow_strings
            ),
            "csv",
            # See https://github.com/dask/dask/issues/9793
            pytest.param("hdf", marks=pytest.mark.flaky(reruns=5)),
        ],
    )
    @pytest.mark.parametrize("fuse", [True, False, None])
    @pytest.mark.parametrize("from_futures", [True, False])
    def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_futures):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        df = pd.DataFrame({"x": [1, 2, 3] * 5, "y": range(15)})
    
        if from_futures:
            parts = [df.iloc[:5], df.iloc[5:10], df.iloc[10:15]]
            futs = c.scatter(parts)
            ddf0 = dd.from_delayed(futs, meta=parts[0])
        else:
            ddf0 = dd.from_pandas(df, npartitions=3)
    
        if io == "parquet-pyarrow":
            pytest.importorskip("pyarrow")
            ddf0.to_parquet(str(tmpdir))
            ddf = dd.read_parquet(str(tmpdir))
        elif io == "parquet-fastparquet":
            pytest.importorskip("fastparquet")
            with pytest.warns(FutureWarning):
                ddf0.to_parquet(str(tmpdir), engine="fastparquet")
                ddf = dd.read_parquet(str(tmpdir), engine="fastparquet")
        elif io == "csv":
            ddf0.to_csv(str(tmpdir), index=False)
            ddf = dd.read_csv(os.path.join(str(tmpdir), "*"))
        elif io == "hdf":
            pytest.importorskip("tables")
            fn = str(tmpdir.join("h5"))
            ddf0.to_hdf(fn, "/data*")
            ddf = dd.read_hdf(fn, "/data*")
        else:
            raise AssertionError("unreachable")
    
        df = df[["x"]] + 10
        ddf = ddf[["x"]] + 10
        with dask.config.set({"optimization.fuse.active": fuse}):
            ddf.compute()
>           dsk = dask.dataframe.optimize(ddf.dask, ddf.__dask_keys__())
E           TypeError: 'module' object is not callable

dask/tests/test_distributed.py:509: TypeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_blockwise_dataframe_io[True-False-parquet-pyarrow] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 5s]
Raw output
RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
from __future__ import annotations
    
    import asyncio
    import contextlib
    import dataclasses
    import heapq
    import inspect
    import itertools
    import json
    import logging
    import math
    import operator
    import os
    import pickle
    import random
    import textwrap
    import uuid
    import warnings
    import weakref
    from collections import defaultdict, deque
    from collections.abc import (
        Callable,
        Collection,
        Container,
        Hashable,
        Iterable,
        Iterator,
        Mapping,
        Sequence,
        Set,
    )
    from contextlib import suppress
    from functools import partial
    from typing import TYPE_CHECKING, Any, ClassVar, Literal, NamedTuple, cast, overload
    
    import psutil
    import tornado.web
    from sortedcontainers import SortedDict, SortedSet
    from tlz import (
        concat,
        first,
        groupby,
        merge,
        merge_sorted,
        merge_with,
        partition,
        pluck,
        second,
        take,
        valmap,
    )
    from tornado.ioloop import IOLoop
    
    import dask
    from dask.core import get_deps, validate_key
    from dask.typing import Key, no_default
    from dask.utils import (
        ensure_dict,
        format_bytes,
        format_time,
        key_split,
        parse_bytes,
        parse_timedelta,
        tmpfile,
    )
    from dask.widgets import get_template
    
    from distributed import cluster_dump, preloading, profile
    from distributed import versions as version_module
    from distributed._asyncio import RLock
    from distributed._stories import scheduler_story
    from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
    from distributed.batched import BatchedSend
    from distributed.client import SourceCode
    from distributed.collections import HeapSet
    from distributed.comm import (
        Comm,
        CommClosedError,
        get_address_host,
        normalize_address,
        resolve_address,
        unparse_host_port,
    )
    from distributed.comm.addressing import addresses_from_user_args
    from distributed.compatibility import PeriodicCallback
    from distributed.core import (
        ErrorMessage,
        OKMessage,
        Status,
        clean_exception,
        error_message,
        rpc,
        send_recv,
    )
    from distributed.diagnostics.memory_sampler import MemorySamplerExtension
    from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name
    from distributed.event import EventExtension
    from distributed.http import get_handlers
    from distributed.lock import LockExtension
    from distributed.metrics import monotonic, time
    from distributed.multi_lock import MultiLockExtension
    from distributed.node import ServerNode
    from distributed.proctitle import setproctitle
    from distributed.protocol import deserialize
    from distributed.protocol.pickle import dumps, loads
    from distributed.protocol.serialize import Serialized, ToPickle, serialize
    from distributed.publish import PublishExtension
    from distributed.pubsub import PubSubSchedulerExtension
    from distributed.queues import QueueExtension
    from distributed.recreate_tasks import ReplayTaskScheduler
    from distributed.security import Security
    from distributed.semaphore import SemaphoreExtension
    from distributed.shuffle import ShuffleSchedulerPlugin
    from distributed.spans import SpansSchedulerExtension
    from distributed.stealing import WorkStealing
    from distributed.utils import (
        All,
        TimeoutError,
        format_dashboard_link,
        get_fileno_limit,
        is_python_shutting_down,
        key_split_group,
        log_errors,
        offload,
        recursive_to_dict,
        wait_for,
    )
    from distributed.utils_comm import (
        gather_from_workers,
        retry_operation,
        scatter_to_workers,
        unpack_remotedata,
    )
    from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
    from distributed.variable import VariableExtension
    from distributed.worker import _normalize_task
    
    if TYPE_CHECKING:
        # TODO import from typing (requires Python >=3.10)
        from typing_extensions import TypeAlias
    
        from dask.highlevelgraph import HighLevelGraph
    
    # Not to be confused with distributed.worker_state_machine.TaskStateState
    TaskStateState: TypeAlias = Literal[
        "released",
        "waiting",
        "no-worker",
        "queued",
        "processing",
        "memory",
        "erred",
        "forgotten",
    ]
    
    ALL_TASK_STATES: Set[TaskStateState] = set(TaskStateState.__args__)  # type: ignore
    
    # {task key -> finish state}
    # Not to be confused with distributed.worker_state_machine.Recs
    Recs: TypeAlias = dict[Key, TaskStateState]
    # {client or worker address: [{op: <key>, ...}, ...]}
    Msgs: TypeAlias = dict[str, list[dict[str, Any]]]
    # (recommendations, client messages, worker messages)
    RecsMsgs: TypeAlias = tuple[Recs, Msgs, Msgs]
    
    T_runspec: TypeAlias = tuple[Callable, tuple, dict[str, Any]]
    
    logger = logging.getLogger(__name__)
    LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")
    DEFAULT_DATA_SIZE = parse_bytes(
        dask.config.get("distributed.scheduler.default-data-size")
    )
    STIMULUS_ID_UNSET = "<stimulus_id unset>"
    
    DEFAULT_EXTENSIONS = {
        "locks": LockExtension,
        "multi_locks": MultiLockExtension,
        "publish": PublishExtension,
        "replay-tasks": ReplayTaskScheduler,
        "queues": QueueExtension,
        "variables": VariableExtension,
        "pubsub": PubSubSchedulerExtension,
        "semaphores": SemaphoreExtension,
        "events": EventExtension,
        "amm": ActiveMemoryManagerExtension,
        "memory_sampler": MemorySamplerExtension,
        "shuffle": ShuffleSchedulerPlugin,
        "spans": SpansSchedulerExtension,
        "stealing": WorkStealing,
    }
    
    
    class ClientState:
        """A simple object holding information about a client."""
    
        #: A unique identifier for this client. This is generally an opaque
        #: string generated by the client itself.
        client_key: str
    
        #: Cached hash of :attr:`~ClientState.client_key`
        _hash: int
    
        #: A set of tasks this client wants to be kept in memory, so that it can download
        #: its result when desired. This is the reverse mapping of
        #: :class:`TaskState.who_wants`. Tasks are typically removed from this set when the
        #: corresponding object in the client's space (for example a ``Future`` or a Dask
        #: collection) gets garbage-collected.
        wants_what: set[TaskState]
    
        #: The last time we received a heartbeat from this client, in local scheduler time.
        last_seen: float
    
        #: Output of :func:`distributed.versions.get_versions` on the client
        versions: dict[str, Any]
    
        __slots__ = tuple(__annotations__)
    
        def __init__(self, client: str, *, versions: dict[str, Any] | None = None):
            self.client_key = client
            self._hash = hash(client)
            self.wants_what = set()
            self.last_seen = time()
            self.versions = versions or {}
    
        def __hash__(self) -> int:
            return self._hash
    
        def __eq__(self, other: object) -> bool:
            if not isinstance(other, ClientState):
                return False
            return self.client_key == other.client_key
    
        def __repr__(self) -> str:
            return f"<Client {self.client_key!r}>"
    
        def __str__(self) -> str:
            return self.client_key
    
        def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict:
            """Dictionary representation for debugging purposes.
            Not type stable and not intended for roundtrips.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            TaskState._to_dict
            """
            return recursive_to_dict(
                self,
                exclude=set(exclude) | {"versions"},  # type: ignore
                members=True,
            )
    
    
    class MemoryState:
        """Memory readings on a worker or on the whole cluster.
    
        See :doc:`worker-memory`.
    
        Attributes / properties:
    
        managed_total
            Sum of the output of sizeof() for all dask keys held by the worker in memory,
            plus number of bytes spilled to disk
    
        managed
            Sum of the output of sizeof() for the dask keys held in RAM. Note that this may
            be inaccurate, which may cause inaccurate unmanaged memory (see below).
    
        spilled
            Number of bytes  for the dask keys spilled to the hard drive.
            Note that this is the size on disk; size in memory may be different due to
            compression and inaccuracies in sizeof(). In other words, given the same keys,
            'managed' will change depending on the keys being in memory or spilled.
    
        process
            Total RSS memory measured by the OS on the worker process.
            This is always exactly equal to managed + unmanaged.
    
        unmanaged
            process - managed. This is the sum of
    
            - Python interpreter and modules
            - global variables
            - memory temporarily allocated by the dask tasks that are currently running
            - memory fragmentation
            - memory leaks
            - memory not yet garbage collected
            - memory not yet free()'d by the Python memory manager to the OS
    
        unmanaged_old
            Minimum of the 'unmanaged' measures over the last
            ``distributed.memory.recent-to-old-time`` seconds
    
        unmanaged_recent
            unmanaged - unmanaged_old; in other words process memory that has been recently
            allocated but is not accounted for by dask; hopefully it's mostly a temporary
            spike.
    
        optimistic
            managed + unmanaged_old; in other words the memory held long-term by
            the process under the hopeful assumption that all unmanaged_recent memory is a
            temporary spike
        """
    
        process: int
        unmanaged_old: int
        managed: int
        spilled: int
    
        __slots__ = tuple(__annotations__)
    
        def __init__(
            self,
            *,
            process: int,
            unmanaged_old: int,
            managed: int,
            spilled: int,
        ):
            # Some data arrives with the heartbeat, some other arrives in realtime as the
            # tasks progress. Also, sizeof() is not guaranteed to return correct results.
            # This can cause glitches where a partial measure is larger than the whole, so
            # we need to force all numbers to add up exactly by definition.
            self.process = process
            self.managed = min(self.process, managed)
            self.spilled = spilled
            # Subtractions between unsigned ints guaranteed by construction to be >= 0
            self.unmanaged_old = min(unmanaged_old, process - self.managed)
    
        @staticmethod
        def sum(*infos: MemoryState) -> MemoryState:
            process = 0
            unmanaged_old = 0
            managed = 0
            spilled = 0
            for ms in infos:
                process += ms.process
                unmanaged_old += ms.unmanaged_old
                spilled += ms.spilled
                managed += ms.managed
            return MemoryState(
                process=process,
                unmanaged_old=unmanaged_old,
                managed=managed,
                spilled=spilled,
            )
    
        @property
        def managed_total(self) -> int:
            return self.managed + self.spilled
    
        @property
        def unmanaged(self) -> int:
            # This is never negative thanks to __init__
            return self.process - self.managed
    
        @property
        def unmanaged_recent(self) -> int:
            # This is never negative thanks to __init__
            return self.process - self.managed - self.unmanaged_old
    
        @property
        def optimistic(self) -> int:
            return self.managed + self.unmanaged_old
    
        @property
        def managed_in_memory(self) -> int:
            warnings.warn("managed_in_memory has been renamed to managed", FutureWarning)
            return self.managed
    
        @property
        def managed_spilled(self) -> int:
            warnings.warn("managed_spilled has been renamed to spilled", FutureWarning)
            return self.spilled
    
        def __repr__(self) -> str:
            return (
                f"Process memory (RSS)  : {format_bytes(self.process)}\n"
                f"  - managed by Dask   : {format_bytes(self.managed)}\n"
                f"  - unmanaged (old)   : {format_bytes(self.unmanaged_old)}\n"
                f"  - unmanaged (recent): {format_bytes(self.unmanaged_recent)}\n"
                f"Spilled to disk       : {format_bytes(self.spilled)}\n"
            )
    
        def _to_dict(self, *, exclude: Container[str] = ()) -> dict:
            """Dictionary representation for debugging purposes.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            """
            return {
                k: getattr(self, k)
                for k in dir(self)
                if not k.startswith("_")
                and k not in {"sum", "managed_in_memory", "managed_spilled"}
            }
    
    
    class WorkerState:
        """A simple object holding information about a worker.
    
        Not to be confused with :class:`distributed.worker_state_machine.WorkerState`.
        """
    
        #: This worker's unique key. This can be its connected address
        #: (such as ``"tcp://127.0.0.1:8891"``) or an alias (such as ``"alice"``).
        address: str
    
        pid: int
        name: Hashable
    
        #: The number of CPU threads made available on this worker
        nthreads: int
    
        #: Memory available to the worker, in bytes
        memory_limit: int
    
        local_directory: str
        services: dict[str, int]
    
        #: Output of :meth:`distributed.versions.get_versions` on the worker
        versions: dict[str, Any]
    
        #: Address of the associated :class:`~distributed.nanny.Nanny`, if present
        nanny: str
    
        #: Read-only worker status, synced one way from the remote Worker object
        status: Status
    
        #: Cached hash of :attr:`~WorkerState.address`
        _hash: int
    
        #: The total memory size, in bytes, used by the tasks this worker holds in memory
        #: (i.e. the tasks in this worker's :attr:`~WorkerState.has_what`).
        nbytes: int
    
        #: Worker memory unknown to the worker, in bytes, which has been there for more than
        #: 30 seconds. See :class:`MemoryState`.
        _memory_unmanaged_old: int
    
        #: History of the last 30 seconds' worth of unmanaged memory. Used to differentiate
        #: between "old" and "new" unmanaged memory.
        #: Format: ``[(timestamp, bytes), (timestamp, bytes), ...]``
        _memory_unmanaged_history: deque[tuple[float, int]]
    
        metrics: dict[str, Any]
    
        #: The last time we received a heartbeat from this worker, in local scheduler time.
        last_seen: float
    
        time_delay: float
        bandwidth: float
    
        #: A set of all TaskStates on this worker that are actors. This only includes those
        #: actors whose state actually lives on this worker, not actors to which this worker
        #: has a reference.
        actors: set[TaskState]
    
        #: Underlying data of :meth:`WorkerState.has_what`
        _has_what: dict[TaskState, None]
    
        #: A set of tasks that have been submitted to this worker. Multiple tasks may be
        # submitted to a worker in advance and the worker will run them eventually,
        # depending on its execution resources (but see :doc:`work-stealing`).
        #:
        #: All the tasks here are in the "processing" state.
        #: This attribute is kept in sync with :attr:`TaskState.processing_on`.
        processing: set[TaskState]
    
        #: Running tasks that invoked :func:`distributed.secede`
        long_running: set[TaskState]
    
        #: A dictionary of tasks that are currently being run on this worker.
        #: Each task state is associated with the duration in seconds which the task has
        #: been running.
        executing: dict[TaskState, float]
    
        #: The available resources on this worker, e.g. ``{"GPU": 2}``.
        #: These are abstract quantities that constrain certain tasks from running at the
        #: same time on this worker.
        resources: dict[str, float]
    
        #: The sum of each resource used by all tasks allocated to this worker.
        #: The numbers in this dictionary can only be less or equal than those in this
        #: worker's :attr:`~WorkerState.resources`.
        used_resources: dict[str, float]
    
        #: Arbitrary additional metadata to be added to :meth:`~WorkerState.identity`
        extra: dict[str, Any]
    
        # The unique server ID this WorkerState is referencing
        server_id: str
    
        # Reference to scheduler task_groups
        scheduler_ref: weakref.ref[SchedulerState] | None
        task_prefix_count: defaultdict[str, int]
        _network_occ: float
        _occupancy_cache: float | None
    
        #: Keys that may need to be fetched to this worker, and the number of tasks that need them.
        #: All tasks are currently in `memory` on a worker other than this one.
        #: Much like `processing`, this does not exactly reflect worker state:
        #: keys here may be queued to fetch, in flight, or already in memory
        #: on the worker.
        needs_what: dict[TaskState, int]
    
        __slots__ = tuple(__annotations__)
    
        def __init__(
            self,
            *,
            address: str,
            status: Status,
            pid: int,
            name: object,
            nthreads: int = 0,
            memory_limit: int,
            local_directory: str,
            nanny: str,
            server_id: str,
            services: dict[str, int] | None = None,
            versions: dict[str, Any] | None = None,
            extra: dict[str, Any] | None = None,
            scheduler: SchedulerState | None = None,
        ):
            self.server_id = server_id
            self.address = address
            self.pid = pid
            self.name = name
            self.nthreads = nthreads
            self.memory_limit = memory_limit
            self.local_directory = local_directory
            self.services = services or {}
            self.versions = versions or {}
            self.nanny = nanny
            self.status = status
            self._hash = hash(self.server_id)
            self.nbytes = 0
            self._memory_unmanaged_old = 0
            self._memory_unmanaged_history = deque()
            self.metrics = {}
            self.last_seen = 0
            self.time_delay = 0
            self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
            self.actors = set()
            self._has_what = {}
            self.processing = set()
            self.long_running = set()
            self.executing = {}
            self.resources = {}
            self.used_resources = {}
            self.extra = extra or {}
            self.scheduler_ref = weakref.ref(scheduler) if scheduler else None
            self.task_prefix_count = defaultdict(int)
            self.needs_what = {}
            self._network_occ = 0
            self._occupancy_cache = None
    
        def __hash__(self) -> int:
            return self._hash
    
        def __eq__(self, other: object) -> bool:
            return isinstance(other, WorkerState) and other.server_id == self.server_id
    
        @property
        def has_what(self) -> Set[TaskState]:
            """An insertion-sorted set-like of tasks which currently reside on this worker.
            All the tasks here are in the "memory" state.
            This is the reverse mapping of :attr:`TaskState.who_has`.
    
            This is a read-only public accessor. The data is implemented as a dict without
            values, because rebalance() relies on dicts being insertion-sorted.
            """
            return self._has_what.keys()
    
        @property
        def host(self) -> str:
            return get_address_host(self.address)
    
        @property
        def memory(self) -> MemoryState:
            """Polished memory metrics for the worker.
    
            **Design note on managed memory**
    
            There are two measures available for managed memory:
    
            - ``self.nbytes``
            - ``self.metrics["managed_bytes"]``
    
            At rest, the two numbers must be identical. However, ``self.nbytes`` is
            immediately updated through the batched comms as soon as each task lands in
            memory on the worker; ``self.metrics["managed_bytes"]`` instead is updated by
            the heartbeat, which can lag several seconds behind.
    
            Below we are mixing likely newer managed memory info from ``self.nbytes`` with
            process and spilled memory from the heartbeat. This is deliberate, so that
            managed memory total is updated more frequently.
    
            Managed memory directly and immediately contributes to optimistic memory, which
            is in turn used in Active Memory Manager heuristics (at the moment of writing;
            more uses will likely be added in the future). So it's important to have it
            up to date; much more than it is for process memory.
    
            Having up-to-date managed memory info as soon as the scheduler learns about
            task completion also substantially simplifies unit tests.
    
            The flip side of this design is that it may cause some noise in the
            unmanaged_recent measure. e.g.:
    
            1. Delete 100MB of managed data
            2. The updated managed memory reaches the scheduler faster than the
               updated process memory
            3. There's a blip where the scheduler thinks that there's a sudden 100MB
               increase in unmanaged_recent, since process memory hasn't changed but managed
               memory has decreased by 100MB
            4. When the heartbeat arrives, process memory goes down and so does the
               unmanaged_recent.
    
            This is OK - one of the main reasons for the unmanaged_recent / unmanaged_old
            split is exactly to concentrate all the noise in unmanaged_recent and exclude it
            from optimistic memory, which is used for heuristics.
    
            Something that is less OK, but also less frequent, is that the sudden deletion
            of spilled keys will cause a negative blip in managed memory:
    
            1. Delete 100MB of spilled data
            2. The updated managed memory *total* reaches the scheduler faster than the
               updated spilled portion
            3. This causes the managed memory to temporarily plummet and be replaced by
               unmanaged_recent, while spilled memory remains unaltered
            4. When the heartbeat arrives, managed goes back up, unmanaged_recent
               goes back down, and spilled goes down by 100MB as it should have to
               begin with.
    
            :issue:`6002` will let us solve this.
            """
            return MemoryState(
                process=self.metrics["memory"],
                managed=max(0, self.nbytes - self.metrics["spilled_bytes"]["memory"]),
                spilled=self.metrics["spilled_bytes"]["disk"],
                unmanaged_old=self._memory_unmanaged_old,
            )
    
        def clean(self) -> WorkerState:
            """Return a version of this object that is appropriate for serialization"""
            ws = WorkerState(
                address=self.address,
                status=self.status,
                pid=self.pid,
                name=self.name,
                nthreads=self.nthreads,
                memory_limit=self.memory_limit,
                local_directory=self.local_directory,
                services=self.services,
                nanny=self.nanny,
                extra=self.extra,
                server_id=self.server_id,
            )
            ws._occupancy_cache = self.occupancy
    
            ws.executing = {
                ts.key: duration for ts, duration in self.executing.items()  # type: ignore
            }
            return ws
    
        def __repr__(self) -> str:
            name = f", name: {self.name}" if self.name != self.address else ""
            return (
                f"<WorkerState {self.address!r}{name}, "
                f"status: {self.status.name}, "
                f"memory: {len(self.has_what)}, "
                f"processing: {len(self.processing)}>"
            )
    
        def _repr_html_(self) -> str:
            return get_template("worker_state.html.j2").render(
                address=self.address,
                name=self.name,
                status=self.status.name,
                has_what=self.has_what,
                processing=self.processing,
            )
    
        def identity(self) -> dict[str, Any]:
            return {
                "type": "Worker",
                "id": self.name,
                "host": self.host,
                "resources": self.resources,
                "local_directory": self.local_directory,
                "name": self.name,
                "nthreads": self.nthreads,
                "memory_limit": self.memory_limit,
                "last_seen": self.last_seen,
                "services": self.services,
                "metrics": self.metrics,
                "status": self.status.name,
                "nanny": self.nanny,
                **self.extra,
            }
    
        def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict[str, Any]:
            """Dictionary representation for debugging purposes.
            Not type stable and not intended for roundtrips.
    
            See also
            --------
            Client.dump_cluster_state
            distributed.utils.recursive_to_dict
            TaskState._to_dict
            """
            return recursive_to_dict(
                self,
                exclude=set(exclude) | {"versions"},  # type: ignore
                members=True,
            )
    
        @property
        def scheduler(self) -> SchedulerState:
            assert self.scheduler_ref
            s = self.scheduler_ref()
            assert s
            return s
    
        def add_to_processing(self, ts: TaskState) -> None:
            """Assign a task to this worker for compute."""
            if self.scheduler.validate:
                assert ts not in self.processing
    
            tp = ts.prefix
            self.task_prefix_count[tp.name] += 1
            self.scheduler._task_prefix_count_global[tp.name] += 1
            self.processing.add(ts)
            for dts in ts.dependencies:
                assert dts.who_has
                if self not in dts.who_has:
                    self._inc_needs_replica(dts)
    
        def add_to_long_running(self, ts: TaskState) -> None:
            if self.scheduler.validate:
                assert ts in self.processing
                assert ts not in self.long_running
    
            self._remove_from_task_prefix_count(ts)
            # Cannot remove from processing since we're using this for things like
            # idleness detection. Idle workers are typically targeted for
            # downscaling but we should not downscale workers with long running
            # tasks
            self.long_running.add(ts)
    
        def remove_from_processing(self, ts: TaskState) -> None:
            """Remove a task from a workers processing"""
            if self.scheduler.validate:
                assert ts in self.processing
    
            if ts in self.long_running:
                self.long_running.discard(ts)
            else:
                self._remove_from_task_prefix_count(ts)
            self.processing.remove(ts)
            for dts in ts.dependencies:
                if dts in self.needs_what:
                    self._dec_needs_replica(dts)
    
        def _remove_from_task_prefix_count(self, ts: TaskState) -> None:
            count = self.task_prefix_count[ts.prefix.name] - 1
            if count:
                self.task_prefix_count[ts.prefix.name] = count
            else:
                del self.task_prefix_count[ts.prefix.name]
    
            count = self.scheduler._task_prefix_count_global[ts.prefix.name] - 1
            if count:
                self.scheduler._task_prefix_count_global[ts.prefix.name] = count
            else:
                del self.scheduler._task_prefix_count_global[ts.prefix.name]
    
        def remove_replica(self, ts: TaskState) -> None:
            """The worker no longer has a task in memory"""
            if self.scheduler.validate:
                assert ts.who_has
                assert self in ts.who_has
                assert ts in self.has_what
                assert ts not in self.needs_what
    
            self.nbytes -= ts.get_nbytes()
            del self._has_what[ts]
            ts.who_has.remove(self)  # type: ignore
            if not ts.who_has:
                ts.who_has = None
    
        def _inc_needs_replica(self, ts: TaskState) -> None:
            """Assign a task fetch to this worker and update network occupancies"""
            if self.scheduler.validate:
                assert ts.who_has
                assert self not in ts.who_has
                assert ts not in self.has_what
            if ts not in self.needs_what:
                self.needs_what[ts] = 1
                nbytes = ts.get_nbytes()
                self._network_occ += nbytes
                self.scheduler._network_occ_global += nbytes
            else:
                self.needs_what[ts] += 1
    
        def _dec_needs_replica(self, ts: TaskState) -> None:
            if self.scheduler.validate:
                assert ts in self.needs_what
    
            self.needs_what[ts] -= 1
            if self.needs_what[ts] == 0:
                del self.needs_what[ts]
                nbytes = ts.get_nbytes()
                self._network_occ -= nbytes
                self.scheduler._network_occ_global -= nbytes
    
        def add_replica(self, ts: TaskState) -> None:
            """The worker acquired a replica of task"""
            if ts.who_has is None:
                ts.who_has = set()
            if ts in self._has_what:
                return
            nbytes = ts.get_nbytes()
            if ts in self.needs_what:
                del self.needs_what[ts]
                self._network_occ -= nbytes
                self.scheduler._network_occ_global -= nbytes
            ts.who_has.add(self)
            self.nbytes += nbytes
            self._has_what[ts] = None
    …rt}"
                logger.info("%11s at:  %25s", name, link)
    
            if self.scheduler_file:
                with open(self.scheduler_file, "w") as f:
                    json.dump(self.identity(), f, indent=2)
    
                fn = self.scheduler_file  # remove file when we close the process
    
                def del_scheduler_file():
                    if os.path.exists(fn):
                        os.remove(fn)
    
                weakref.finalize(self, del_scheduler_file)
    
            await self.preloads.start()
    
            if self.jupyter:
                # Allow insecure communications from local users
                if self.address.startswith("tls://"):
                    await self.listen("tcp://localhost:0")
                os.environ["DASK_SCHEDULER_ADDRESS"] = self.listeners[-1].contact_address
    
            await asyncio.gather(
                *[plugin.start(self) for plugin in list(self.plugins.values())]
            )
    
            self.start_periodic_callbacks()
    
            setproctitle(f"dask scheduler [{self.address}]")
            return self
    
        async def close(self, fast=None, close_workers=None, reason=""):
            """Send cleanup signal to all coroutines then wait until finished
    
            See Also
            --------
            Scheduler.cleanup
            """
            if fast is not None or close_workers is not None:
                warnings.warn(
                    "The 'fast' and 'close_workers' parameters in Scheduler.close have no "
                    "effect and will be removed in a future version of distributed.",
                    FutureWarning,
                )
            if self.status in (Status.closing, Status.closed):
                await self.finished()
                return
    
            async def log_errors(func):
                try:
                    await func()
                except Exception:
                    logger.exception("Plugin call failed during scheduler.close")
    
            await asyncio.gather(
                *[log_errors(plugin.before_close) for plugin in list(self.plugins.values())]
            )
    
            self.status = Status.closing
            logger.info("Scheduler closing due to %s...", reason or "unknown reason")
            setproctitle("dask scheduler [closing]")
    
            await self.preloads.teardown()
    
            await asyncio.gather(
                *[log_errors(plugin.close) for plugin in list(self.plugins.values())]
            )
    
            for pc in self.periodic_callbacks.values():
                pc.stop()
            self.periodic_callbacks.clear()
    
            self.stop_services()
    
            for ext in self.extensions.values():
                with suppress(AttributeError):
                    ext.teardown()
            logger.info("Scheduler closing all comms")
    
            futures = []
            for _, comm in list(self.stream_comms.items()):
                # FIXME use `self.remove_worker()` instead after https://github.com/dask/distributed/issues/6390
                if not comm.closed():
                    # This closes the Worker and ensures that if a Nanny is around,
                    # it is closed as well
                    comm.send({"op": "close", "reason": "scheduler-close"})
                    comm.send({"op": "close-stream"})
                    # ^ TODO remove? `Worker.close` will close the stream anyway.
                with suppress(AttributeError):
                    futures.append(comm.close())
    
            await asyncio.gather(*futures)
    
            if self.jupyter:
                await self._jupyter_server_application._cleanup()
    
            for comm in self.client_comms.values():
                comm.abort()
    
            await self.rpc.close()
    
            self.status = Status.closed
            self.stop()
            await super().close()
    
            setproctitle("dask scheduler [closed]")
            disable_gc_diagnosis()
    
        ###########
        # Stimuli #
        ###########
    
        def heartbeat_worker(
            self,
            *,
            address: str,
            resolve_address: bool = True,
            now: float | None = None,
            resources: dict[str, float] | None = None,
            host_info: dict | None = None,
            metrics: dict,
            executing: dict[Key, float] | None = None,
            extensions: dict | None = None,
        ) -> dict[str, Any]:
            address = self.coerce_address(address, resolve_address)
            address = normalize_address(address)
            ws = self.workers.get(address)
            if ws is None:
                logger.warning(f"Received heartbeat from unregistered worker {address!r}.")
                return {"status": "missing"}
    
            host = get_address_host(address)
            local_now = time()
            host_info = host_info or {}
    
            dh: dict = self.host_info.setdefault(host, {})
            dh["last-seen"] = local_now
    
            frac = 1 / len(self.workers)
            self.bandwidth = (
                self.bandwidth * (1 - frac) + metrics["bandwidth"]["total"] * frac
            )
            for other, (bw, count) in metrics["bandwidth"]["workers"].items():
                if (address, other) not in self.bandwidth_workers:
                    self.bandwidth_workers[address, other] = bw / count
                else:
                    alpha = (1 - frac) ** count
                    self.bandwidth_workers[address, other] = self.bandwidth_workers[
                        address, other
                    ] * alpha + bw * (1 - alpha)
            for typ, (bw, count) in metrics["bandwidth"]["types"].items():
                if typ not in self.bandwidth_types:
                    self.bandwidth_types[typ] = bw / count
                else:
                    alpha = (1 - frac) ** count
                    self.bandwidth_types[typ] = self.bandwidth_types[typ] * alpha + bw * (
                        1 - alpha
                    )
    
            ws.last_seen = local_now
            if executing is not None:
                # NOTE: the executing dict is unused
                ws.executing = {}
                for key, duration in executing.items():
                    if key in self.tasks:
                        ts = self.tasks[key]
                        ws.executing[ts] = duration
                        ts.prefix.add_exec_time(duration)
    
            for name, value in metrics["digests_total_since_heartbeat"].items():
                self.cumulative_worker_metrics[name] += value
    
            ws.metrics = metrics
    
            # Calculate RSS - dask keys, separating "old" and "new" usage
            # See MemoryState for details
            max_memory_unmanaged_old_hist_age = local_now - self.MEMORY_RECENT_TO_OLD_TIME
            memory_unmanaged_old = ws._memory_unmanaged_old
            while ws._memory_unmanaged_history:
                timestamp, size = ws._memory_unmanaged_history[0]
                if timestamp >= max_memory_unmanaged_old_hist_age:
                    break
                ws._memory_unmanaged_history.popleft()
                if size == memory_unmanaged_old:
                    memory_unmanaged_old = 0  # recalculate min()
    
            # ws._nbytes is updated at a different time and sizeof() may not be accurate,
            # so size may be (temporarily) negative; floor it to zero.
            size = max(
                0, metrics["memory"] - ws.nbytes + metrics["spilled_bytes"]["memory"]
            )
    
            ws._memory_unmanaged_history.append((local_now, size))
            if not memory_unmanaged_old:
                # The worker has just been started or the previous minimum has been expunged
                # because too old.
                # Note: this algorithm is capped to 200 * MEMORY_RECENT_TO_OLD_TIME elements
                # cluster-wide by heartbeat_interval(), regardless of the number of workers
                ws._memory_unmanaged_old = min(map(second, ws._memory_unmanaged_history))
            elif size < memory_unmanaged_old:
                ws._memory_unmanaged_old = size
    
            if host_info:
                dh = self.host_info.setdefault(host, {})
                dh.update(host_info)
    
            if now:
                ws.time_delay = local_now - now
    
            if resources:
                self.add_resources(worker=address, resources=resources)
    
            if extensions:
                for name, data in extensions.items():
                    self.extensions[name].heartbeat(ws, data)
    
            return {
                "status": "OK",
                "time": local_now,
                "heartbeat-interval": heartbeat_interval(len(self.workers)),
            }
    
        @log_errors
        async def add_worker(
            self,
            comm: Comm,
            *,
            address: str,
            status: str,
            server_id: str,
            nthreads: int,
            name: str,
            resolve_address: bool = True,
            now: float,
            resources: dict[str, float],
            # FIXME: This is never submitted by the worker
            host_info: None = None,
            memory_limit: int | None,
            metrics: dict[str, Any],
            pid: int = 0,
            services: dict[str, int],
            local_directory: str,
            versions: dict[str, Any],
            nanny: str,
            extra: dict,
            stimulus_id: str,
        ) -> None:
            """Add a new worker to the cluster"""
            address = self.coerce_address(address, resolve_address)
            address = normalize_address(address)
            host = get_address_host(address)
    
            if address in self.workers:
                raise ValueError("Worker already exists %s" % address)
    
            if name in self.aliases:
                logger.warning("Worker tried to connect with a duplicate name: %s", name)
                msg = {
                    "status": "error",
                    "message": "name taken, %s" % name,
                    "time": time(),
                }
                await comm.write(msg)
                return
    
            self.log_event(address, {"action": "add-worker"})
            self.log_event("all", {"action": "add-worker", "worker": address})
    
            self.workers[address] = ws = WorkerState(
                address=address,
                status=Status.lookup[status],  # type: ignore
                pid=pid,
                nthreads=nthreads,
                memory_limit=memory_limit or 0,
                name=name,
                local_directory=local_directory,
                services=services,
                versions=versions,
                nanny=nanny,
                extra=extra,
                server_id=server_id,
                scheduler=self,
            )
            if ws.status == Status.running:
                self.running.add(ws)
    
            dh = self.host_info.get(host)
            if dh is None:
                self.host_info[host] = dh = {}
    
            dh_addresses = dh.get("addresses")
            if dh_addresses is None:
                dh["addresses"] = dh_addresses = set()
                dh["nthreads"] = 0
    
            dh_addresses.add(address)
            dh["nthreads"] += nthreads
    
            self.total_nthreads += nthreads
            self.total_nthreads_history.append((time(), self.total_nthreads))
            self.aliases[name] = address
    
            self.heartbeat_worker(
                address=address,
                resolve_address=resolve_address,
                now=now,
                resources=resources,
                host_info=host_info,
                metrics=metrics,
            )
    
            # Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot
            # exist before this.
            self.check_idle_saturated(ws)
    
            self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)
    
            awaitables = []
            for plugin in list(self.plugins.values()):
                try:
                    result = plugin.add_worker(scheduler=self, worker=address)
                    if result is not None and inspect.isawaitable(result):
                        awaitables.append(result)
                except Exception as e:
                    logger.exception(e)
    
            plugin_msgs = await asyncio.gather(*awaitables, return_exceptions=True)
            plugins_exceptions = [msg for msg in plugin_msgs if isinstance(msg, Exception)]
            for exc in plugins_exceptions:
                logger.exception(exc, exc_info=exc)
    
            if ws.status == Status.running:
                self.transitions(
                    self.bulk_schedule_unrunnable_after_adding_worker(ws), stimulus_id
                )
                self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
    
            logger.info("Register worker %s", ws)
    
            msg = {
                "status": "OK",
                "time": time(),
                "heartbeat-interval": heartbeat_interval(len(self.workers)),
                "worker-plugins": self.worker_plugins,
            }
    
            version_warning = version_module.error_message(
                version_module.get_versions(),
                {w: ws.versions for w, ws in self.workers.items()},
                versions,
                source_name=str(ws.server_id),
            )
            msg.update(version_warning)
    
            await comm.write(msg)
            # This will keep running until the worker is removed
            await self.handle_worker(comm, address)
    
        async def add_nanny(self, comm: Comm, address: str) -> None:
            async with self._starting_nannies_cond:
                self._starting_nannies.add(address)
            try:
                msg = {
                    "status": "OK",
                    "nanny-plugins": self.nanny_plugins,
                }
                await comm.write(msg)
                await comm.read()
            finally:
                async with self._starting_nannies_cond:
                    self._starting_nannies.discard(address)
                    self._starting_nannies_cond.notify_all()
    
        def _match_graph_with_tasks(
            self,
            dsk: dict[Key, T_runspec],
            dependencies: dict[Key, set[Key]],
            keys: set[Key],
        ) -> set[Key]:
            n = 0
            lost_keys = set()
            while len(dsk) != n:  # walk through new tasks, cancel any bad deps
                n = len(dsk)
                for k, deps in list(dependencies.items()):
                    if any(
                        dep not in self.tasks and dep not in dsk for dep in deps
                    ):  # bad key
                        lost_keys.add(k)
                        logger.info("User asked for computation on lost data, %s", k)
                        del dsk[k]
                        del dependencies[k]
                        if k in keys:
                            keys.remove(k)
                    del deps
            # Avoid computation that is already finished
            done = set()  # tasks that are already done
            for k, v in dependencies.items():
                if v and k in self.tasks:
                    ts = self.tasks[k]
                    if ts.state in ("memory", "erred"):
                        done.add(k)
    
            if done:
                dependents = dask.core.reverse_dict(dependencies)
                stack = list(done)
                while stack:  # remove unnecessary dependencies
                    key = stack.pop()
                    try:
                        deps = dependencies[key]
                    except KeyError:
                        deps = {ts.key for ts in self.tasks[key].dependencies}
                    for dep in deps:
                        if dep in dependents:
                            child_deps = dependents[dep]
                        elif dep in self.tasks:
                            child_deps = {ts.key for ts in self.tasks[key].dependencies}
                        else:
                            child_deps = set()
                        if all(d in done for d in child_deps):
                            if dep in self.tasks and dep not in done:
                                done.add(dep)
                                stack.append(dep)
            for anc in done:
                dsk.pop(anc, None)
                dependencies.pop(anc, None)
            return lost_keys
    
        def _create_taskstate_from_graph(
            self,
            *,
            start: float,
            dsk: dict[Key, T_runspec],
            dependencies: dict,
            keys: set[Key],
            ordered: dict[Key, int],
            client: str,
            annotations_by_type: dict,
            global_annotations: dict | None,
            stimulus_id: str,
            submitting_task: Key | None,
            user_priority: int | dict[Key, int] = 0,
            actors: bool | list[Key] | None = None,
            fifo_timeout: float = 0.0,
            code: tuple[SourceCode, ...] = (),
        ) -> None:
            """
            Take a low level graph and create the necessary scheduler state to
            compute it.
    
            WARNING
            -------
            This method must not be made async since nothing here is concurrency
            safe. All interactions with TaskState objects here should be happening
            in the same event loop tick.
            """
    
            lost_keys = self._match_graph_with_tasks(dsk, dependencies, keys)
    
            if len(dsk) > 1:
                self.log_event(
                    ["all", client], {"action": "update_graph", "count": len(dsk)}
                )
    
            if lost_keys:
                self.report({"op": "cancelled-keys", "keys": lost_keys}, client=client)
                self.client_releases_keys(
                    keys=lost_keys, client=client, stimulus_id=stimulus_id
                )
    
            if not self.is_idle and self.computations:
                # Still working on something. Assign new tasks to same computation
                computation = self.computations[-1]
            else:
                computation = Computation()
                self.computations.append(computation)
    
            if code:  # add new code blocks
                computation.code.add(code)
            if global_annotations:
                # FIXME: This is kind of inconsistent since it only includes global
                # annotations.
                computation.annotations.update(global_annotations)
            del global_annotations
    
            runnable, touched_tasks, new_tasks = self._generate_taskstates(
                keys=keys,
                dsk=dsk,
                dependencies=dependencies,
                computation=computation,
            )
    
            keys_with_annotations = self._apply_annotations(
                tasks=new_tasks,
                annotations_by_type=annotations_by_type,
            )
    
            self._set_priorities(
                internal_priority=ordered,
                submitting_task=submitting_task,
                user_priority=user_priority,
                fifo_timeout=fifo_timeout,
                start=start,
                tasks=runnable,
            )
    
            self.client_desires_keys(keys=keys, client=client)
    
            # Add actors
            if actors is True:
                actors = list(keys)
            for actor in actors or []:
                ts = self.tasks[actor]
                ts.actor = True
    
            # Compute recommendations
            recommendations: Recs = {}
            priority = dict()
            for ts in sorted(
                runnable,
                key=operator.attrgetter("priority"),
                reverse=True,
            ):
                assert ts.priority  # mypy
                priority[ts.key] = ts.priority
                assert ts.run_spec
                if ts.state == "released":
                    recommendations[ts.key] = "waiting"
    
            for ts in runnable:
                for dts in ts.dependencies:
                    if dts.exception_blame:
                        ts.exception_blame = dts.exception_blame
                        recommendations[ts.key] = "erred"
                        break
    
            annotations_for_plugin: defaultdict[str, dict[Key, Any]] = defaultdict(dict)
            for key in keys_with_annotations:
                ts = self.tasks[key]
                if ts.annotations:
                    for annot, value in ts.annotations.items():
                        annotations_for_plugin[annot][key] = value
    
            spans_ext: SpansSchedulerExtension | None = self.extensions.get("spans")
            if spans_ext:
                # new_tasks does not necessarily contain all runnable tasks;
                # _generate_taskstates is not the only thing that calls new_task(). A
                # TaskState may have also been created by client_desires_keys or scatter,
                # and only later gained a run_spec.
                span_annotations = spans_ext.observe_tasks(runnable, code=code)
                # In case of TaskGroup collision, spans may have changed
                # FIXME: Is this used anywhere besides tests?
                if span_annotations:
                    annotations_for_plugin["span"] = span_annotations
                else:
                    annotations_for_plugin.pop("span", None)
    
            for plugin in list(self.plugins.values()):
                try:
                    plugin.update_graph(
                        self,
                        client=client,
                        tasks=[ts.key for ts in touched_tasks],
                        keys=keys,
                        dependencies=dependencies,
                        annotations=dict(annotations_for_plugin),
                        priority=priority,
                    )
                except Exception as e:
                    logger.exception(e)
    
            self.transitions(recommendations, stimulus_id)
    
            for ts in touched_tasks:
                if ts.state in ("memory", "erred"):
                    self.report_on_key(ts=ts, client=client)
    
        @log_errors
        async def update_graph(
            self,
            client: str,
            graph_header: dict,
            graph_frames: list[bytes],
            keys: set[Key],
            internal_priority: dict[Key, int] | None,
            submitting_task: Key | None,
            user_priority: int | dict[Key, int] = 0,
            actors: bool | list[Key] | None = None,
            fifo_timeout: float = 0.0,
            code: tuple[SourceCode, ...] = (),
            annotations: dict | None = None,
            stimulus_id: str | None = None,
        ) -> None:
            # FIXME: Apparently empty dicts arrive as a ToPickle object
            if isinstance(annotations, ToPickle):
                annotations = annotations.data  # type: ignore[unreachable]
            start = time()
            try:
                try:
>                   graph = deserialize(graph_header, graph_frames).data

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/scheduler.py:4671: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/serialize.py:439: in deserialize
    return loads(header, frames)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/serialize.py:101: in pickle_loads
    return pickle.loads(x, buffers=buffers)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/protocol/pickle.py:94: in loads
    return pickle.loads(x, buffers=buffers)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/__init__.py:1: in <module>
    from dask_expr import _version, datasets
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/datasets.py:7: in <module>
    from dask_expr._collection import new_collection
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:11: in <module>
    import dask.dataframe.methods as methods
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    from __future__ import annotations
    
    
    def _dask_expr_enabled() -> bool:
        import dask
    
        use_dask_expr = dask.config.get("dataframe.query-planning")
        if use_dask_expr:
            try:
                import dask_expr  # noqa: F401
            except ImportError:
                raise ValueError("Must install dask-expr to activate query planning.")
        return use_dask_expr
    
    
    if _dask_expr_enabled():
>       from dask_expr import (
            DataFrame,
            Index,
            Series,
            concat,
            from_array,
            from_dask_array,
            from_dask_dataframe,
            from_delayed,
            from_dict,
            from_graph,
            from_map,
            from_pandas,
            get_dummies,
            isna,
            map_overlap,
            map_partitions,
            merge,
            pivot_table,
            read_csv,
            read_hdf,
            read_json,
            read_orc,
            read_parquet,
            read_sql,
            read_sql_query,
            read_sql_table,
            read_table,
            repartition,
            to_bag,
            to_csv,
            to_datetime,
            to_hdf,
            to_json,
            to_numeric,
            to_orc,
            to_parquet,
            to_records,
            to_sql,
            to_timedelta,
        )
E       ImportError: cannot import name 'DataFrame' from partially initialized module 'dask_expr' (most likely due to a circular import) (/home/runner/miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/__init__.py)

dask/dataframe/__init__.py:17: ImportError

The above exception was the direct cause of the following exception:

c = <Client: 'tcp://127.0.0.1:33241' processes=2 threads=2, memory=31.21 GiB>
tmpdir = local('/tmp/pytest-of-runner/pytest-0/popen-gw2/test_blockwise_dataframe_io_Tr6')
io = 'parquet-pyarrow', fuse = False, from_futures = True

    @ignore_sync_scheduler_warning
    @pytest.mark.parametrize(
        "io",
        [
            "parquet-pyarrow",
            pytest.param(
                "parquet-fastparquet", marks=pytest.mark.skip_with_pyarrow_strings
            ),
            "csv",
            # See https://github.com/dask/dask/issues/9793
            pytest.param("hdf", marks=pytest.mark.flaky(reruns=5)),
        ],
    )
    @pytest.mark.parametrize("fuse", [True, False, None])
    @pytest.mark.parametrize("from_futures", [True, False])
    def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_futures):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        df = pd.DataFrame({"x": [1, 2, 3] * 5, "y": range(15)})
    
        if from_futures:
            parts = [df.iloc[:5], df.iloc[5:10], df.iloc[10:15]]
            futs = c.scatter(parts)
            ddf0 = dd.from_delayed(futs, meta=parts[0])
        else:
            ddf0 = dd.from_pandas(df, npartitions=3)
    
        if io == "parquet-pyarrow":
            pytest.importorskip("pyarrow")
>           ddf0.to_parquet(str(tmpdir))

dask/tests/test_distributed.py:487: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:1909: in to_parquet
    return to_parquet(self, path, **kwargs)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/io/parquet.py:383: in to_parquet
    out = out.compute(**compute_kwargs)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:307: in compute
    return DaskMethodsMixin.compute(out, **kwargs)
dask/base.py:379: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:665: in compute
    results = schedule(dsk, keys, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Client: 'tcp://127.0.0.1:33241' processes=2 threads=2, memory=31.21 GiB>
futures = [[<Future: cancelled, key: ('toparquetbarrier-ad1cc644df1d04a6732e2a56454c52e6', 0)>]]
errors = 'raise', direct = False, local_worker = None

    async def _gather(self, futures, errors="raise", direct=None, local_worker=None):
        unpacked, future_set = unpack_remotedata(futures, byte_keys=True)
        mismatched_futures = [f for f in future_set if f.client is not self]
        if mismatched_futures:
            raise ValueError(
                "Cannot gather Futures created by another client. "
                f"These are the {len(mismatched_futures)} (out of {len(futures)}) "
                f"mismatched Futures and their client IDs (this client is {self.id}): "
                f"{ {f: f.client.id for f in mismatched_futures} }"  # noqa: E201, E202
            )
        keys = [future.key for future in future_set]
        bad_data = dict()
        data = {}
    
        if direct is None:
            direct = self.direct_to_workers
        if direct is None:
            try:
                w = get_worker()
            except Exception:
                direct = False
            else:
                if w.scheduler.address == self.scheduler.address:
                    direct = True
    
        async def wait(k):
            """Want to stop the All(...) early if we find an error"""
            try:
                st = self.futures[k]
            except KeyError:
                raise AllExit()
            else:
                await st.wait()
            if st.status != "finished" and errors == "raise":
                raise AllExit()
    
        while True:
            logger.debug("Waiting on futures to clear before gather")
    
            with suppress(AllExit):
                await distributed.utils.All(
                    [wait(key) for key in keys if key in self.futures],
                    quiet_exceptions=AllExit,
                )
    
            failed = ("error", "cancelled")
    
            exceptions = set()
            bad_keys = set()
            for key in keys:
                if key not in self.futures or self.futures[key].status in failed:
                    exceptions.add(key)
                    if errors == "raise":
                        try:
                            st = self.futures[key]
                            exception = st.exception
                            traceback = st.traceback
                        except (KeyError, AttributeError):
                            exc = CancelledError(key)
                        else:
>                           raise exception.with_traceback(traceback)
E                           RuntimeError: Error during deserialization of the task graph. This frequently
E                           occurs if the Scheduler and Client have different environments.
E                           For more information, see
E                           https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments

../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/distributed/client.py:2243: RuntimeError

Check warning on line 0 in dask.tests.test_order

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_array_vs_dataframe[True] (dask.tests.test_order)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 1s]
Raw output
RuntimeError: Object HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7faaa4e77f50>
 0. arange-02c8c9974983e1c5d4f1c707f0bd803e
 1. broadcast_to-5b7358232d708444643e6722becee744
 2. transpose-298501149b1a91e5856ad193e4c87e91
 3. reshape-8f5c71799840422fa3c833357ff5db80
 4. from-dask-array-a42a8ce1a6d5d6fc63fcbc56dbeb07d8
 cannot be deterministically hashed. Please, see https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing for more information
optimize = True

    @pytest.mark.parametrize(
        "optimize",
        [
            True,
            False,
        ],
    )
    def test_array_vs_dataframe(optimize):
        xr = pytest.importorskip("xarray")
    
        import dask.array as da
    
        size = 5000
        ds = xr.Dataset(
            dict(
                anom_u=(
                    ["time", "face", "j", "i"],
                    da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
                ),
                anom_v=(
                    ["time", "face", "j", "i"],
                    da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
                ),
            )
        )
    
        quad = ds**2
        quad["uv"] = ds.anom_u * ds.anom_v
        mean = quad.mean("time")
        diag_array = diagnostics(collections_to_dsk([mean], optimize_graph=optimize))
        diag_df = diagnostics(
>           collections_to_dsk([mean.to_dask_dataframe()], optimize_graph=optimize)
        )

dask/tests/test_order.py:1217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/base.py:430: in collections_to_dsk
    dsk, keys = _extract_graph_and_keys(val)
dask/base.py:456: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:315: in __dask_graph__
    out = out.lower_completely()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:383: in lower_completely
    new = expr.lower_once()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:340: in lower_once
    out = expr._lower()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_concat.py:140: in _lower
    if self._are_co_alinged_or_single_partition:
../../../miniconda3/envs/test-environment/lib/python3.12/functools.py:995: in __get__
    val = self.func(instance)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_concat.py:133: in _are_co_alinged_or_single_partition
    return are_co_aligned(*self._frames, allow_broadcast=False) or {
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_expr.py:2400: in are_co_aligned
    _tokenize_partial(item, ["columns", "_series"])
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_util.py:125: in _tokenize_partial
    return _tokenize_deterministic(
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_util.py:118: in _tokenize_deterministic
    return tokenize(*args, **kwargs)
dask/base.py:1035: in tokenize
    hasher = _md5(str(tuple(map(normalize_token, args))).encode())
dask/utils.py:766: in __call__
    return meth(arg, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

o = HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7faaa4e77f50>
 0. arange-02c8c9974983e1c...91e5856ad193e4c87e91
 3. reshape-8f5c71799840422fa3c833357ff5db80
 4. from-dask-array-a42a8ce1a6d5d6fc63fcbc56dbeb07d8


    @normalize_token.register(object)
    def normalize_object(o):
        method = getattr(o, "__dask_tokenize__", None)
        if method is not None:
            return method()
    
        if callable(o):
            return normalize_function(o)
    
        if dataclasses.is_dataclass(o):
            return normalize_dataclass(o)
    
        if not config.get("tokenize.ensure-deterministic"):
            return uuid.uuid4().hex
    
>       raise RuntimeError(
            f"Object {str(o)} cannot be deterministically hashed. Please, see "
            "https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing "
            "for more information"
        )
E       RuntimeError: Object HighLevelGraph with 5 layers.
E       <dask.highlevelgraph.HighLevelGraph object at 0x7faaa4e77f50>
E        0. arange-02c8c9974983e1c5d4f1c707f0bd803e
E        1. broadcast_to-5b7358232d708444643e6722becee744
E        2. transpose-298501149b1a91e5856ad193e4c87e91
E        3. reshape-8f5c71799840422fa3c833357ff5db80
E        4. from-dask-array-a42a8ce1a6d5d6fc63fcbc56dbeb07d8
E        cannot be deterministically hashed. Please, see https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing for more information

dask/base.py:1130: RuntimeError

Check warning on line 0 in dask.tests.test_order

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 15 runs failed: test_array_vs_dataframe[False] (dask.tests.test_order)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 1s]
Raw output
RuntimeError: Object HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7faaa4e201d0>
 0. arange-02c8c9974983e1c5d4f1c707f0bd803e
 1. broadcast_to-5b7358232d708444643e6722becee744
 2. transpose-298501149b1a91e5856ad193e4c87e91
 3. reshape-8f5c71799840422fa3c833357ff5db80
 4. from-dask-array-a42a8ce1a6d5d6fc63fcbc56dbeb07d8
 cannot be deterministically hashed. Please, see https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing for more information
optimize = False

    @pytest.mark.parametrize(
        "optimize",
        [
            True,
            False,
        ],
    )
    def test_array_vs_dataframe(optimize):
        xr = pytest.importorskip("xarray")
    
        import dask.array as da
    
        size = 5000
        ds = xr.Dataset(
            dict(
                anom_u=(
                    ["time", "face", "j", "i"],
                    da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
                ),
                anom_v=(
                    ["time", "face", "j", "i"],
                    da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
                ),
            )
        )
    
        quad = ds**2
        quad["uv"] = ds.anom_u * ds.anom_v
        mean = quad.mean("time")
        diag_array = diagnostics(collections_to_dsk([mean], optimize_graph=optimize))
        diag_df = diagnostics(
>           collections_to_dsk([mean.to_dask_dataframe()], optimize_graph=optimize)
        )

dask/tests/test_order.py:1217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/base.py:444: in collections_to_dsk
    dsk, _ = _extract_graph_and_keys(collections)
dask/base.py:456: in _extract_graph_and_keys
    graphs.append(v.__dask_graph__())
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_collection.py:315: in __dask_graph__
    out = out.lower_completely()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:383: in lower_completely
    new = expr.lower_once()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_core.py:340: in lower_once
    out = expr._lower()
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_concat.py:140: in _lower
    if self._are_co_alinged_or_single_partition:
../../../miniconda3/envs/test-environment/lib/python3.12/functools.py:995: in __get__
    val = self.func(instance)
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_concat.py:133: in _are_co_alinged_or_single_partition
    return are_co_aligned(*self._frames, allow_broadcast=False) or {
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_expr.py:2400: in are_co_aligned
    _tokenize_partial(item, ["columns", "_series"])
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_util.py:125: in _tokenize_partial
    return _tokenize_deterministic(
../../../miniconda3/envs/test-environment/lib/python3.12/site-packages/dask_expr/_util.py:118: in _tokenize_deterministic
    return tokenize(*args, **kwargs)
dask/base.py:1035: in tokenize
    hasher = _md5(str(tuple(map(normalize_token, args))).encode())
dask/utils.py:766: in __call__
    return meth(arg, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

o = HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7faaa4e201d0>
 0. arange-02c8c9974983e1c...91e5856ad193e4c87e91
 3. reshape-8f5c71799840422fa3c833357ff5db80
 4. from-dask-array-a42a8ce1a6d5d6fc63fcbc56dbeb07d8


    @normalize_token.register(object)
    def normalize_object(o):
        method = getattr(o, "__dask_tokenize__", None)
        if method is not None:
            return method()
    
        if callable(o):
            return normalize_function(o)
    
        if dataclasses.is_dataclass(o):
            return normalize_dataclass(o)
    
        if not config.get("tokenize.ensure-deterministic"):
            return uuid.uuid4().hex
    
>       raise RuntimeError(
            f"Object {str(o)} cannot be deterministically hashed. Please, see "
            "https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing "
            "for more information"
        )
E       RuntimeError: Object HighLevelGraph with 5 layers.
E       <dask.highlevelgraph.HighLevelGraph object at 0x7faaa4e201d0>
E        0. arange-02c8c9974983e1c5d4f1c707f0bd803e
E        1. broadcast_to-5b7358232d708444643e6722becee744
E        2. transpose-298501149b1a91e5856ad193e4c87e91
E        3. reshape-8f5c71799840422fa3c833357ff5db80
E        4. from-dask-array-a42a8ce1a6d5d6fc63fcbc56dbeb07d8
E        cannot be deterministically hashed. Please, see https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing for more information

dask/base.py:1130: RuntimeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_blockwise_dataframe_io[True-False-csv] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 6s]
Raw output
TypeError: 'module' object is not callable
c = <Client: 'tcp://127.0.0.1:32831' processes=2 threads=2, memory=31.21 GiB>
tmpdir = local('/tmp/pytest-of-runner/pytest-0/popen-gw2/test_blockwise_dataframe_io_Tr7')
io = 'csv', fuse = False, from_futures = True

    @ignore_sync_scheduler_warning
    @pytest.mark.parametrize(
        "io",
        [
            "parquet-pyarrow",
            pytest.param(
                "parquet-fastparquet", marks=pytest.mark.skip_with_pyarrow_strings
            ),
            "csv",
            # See https://github.com/dask/dask/issues/9793
            pytest.param("hdf", marks=pytest.mark.flaky(reruns=5)),
        ],
    )
    @pytest.mark.parametrize("fuse", [True, False, None])
    @pytest.mark.parametrize("from_futures", [True, False])
    def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_futures):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        df = pd.DataFrame({"x": [1, 2, 3] * 5, "y": range(15)})
    
        if from_futures:
            parts = [df.iloc[:5], df.iloc[5:10], df.iloc[10:15]]
            futs = c.scatter(parts)
            ddf0 = dd.from_delayed(futs, meta=parts[0])
        else:
            ddf0 = dd.from_pandas(df, npartitions=3)
    
        if io == "parquet-pyarrow":
            pytest.importorskip("pyarrow")
            ddf0.to_parquet(str(tmpdir))
            ddf = dd.read_parquet(str(tmpdir))
        elif io == "parquet-fastparquet":
            pytest.importorskip("fastparquet")
            with pytest.warns(FutureWarning):
                ddf0.to_parquet(str(tmpdir), engine="fastparquet")
                ddf = dd.read_parquet(str(tmpdir), engine="fastparquet")
        elif io == "csv":
            ddf0.to_csv(str(tmpdir), index=False)
            ddf = dd.read_csv(os.path.join(str(tmpdir), "*"))
        elif io == "hdf":
            pytest.importorskip("tables")
            fn = str(tmpdir.join("h5"))
            ddf0.to_hdf(fn, "/data*")
            ddf = dd.read_hdf(fn, "/data*")
        else:
            raise AssertionError("unreachable")
    
        df = df[["x"]] + 10
        ddf = ddf[["x"]] + 10
        with dask.config.set({"optimization.fuse.active": fuse}):
            ddf.compute()
>           dsk = dask.dataframe.optimize(ddf.dask, ddf.__dask_keys__())
E           TypeError: 'module' object is not callable

dask/tests/test_distributed.py:509: TypeError

Check warning on line 0 in dask.tests.test_distributed

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_blockwise_dataframe_io[True-False-hdf] (dask.tests.test_distributed)

artifacts/ubuntu-latest-3.12-dask-expr/pytest.xml [took 40s]
Raw output
TypeError: 'module' object is not callable
c = <Client: 'tcp://127.0.0.1:39559' processes=2 threads=2, memory=31.21 GiB>
tmpdir = local('/tmp/pytest-of-runner/pytest-0/popen-gw2/test_blockwise_dataframe_io_Tr13')
io = 'hdf', fuse = False, from_futures = True

    @ignore_sync_scheduler_warning
    @pytest.mark.parametrize(
        "io",
        [
            "parquet-pyarrow",
            pytest.param(
                "parquet-fastparquet", marks=pytest.mark.skip_with_pyarrow_strings
            ),
            "csv",
            # See https://github.com/dask/dask/issues/9793
            pytest.param("hdf", marks=pytest.mark.flaky(reruns=5)),
        ],
    )
    @pytest.mark.parametrize("fuse", [True, False, None])
    @pytest.mark.parametrize("from_futures", [True, False])
    def test_blockwise_dataframe_io(c, tmpdir, io, fuse, from_futures):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        df = pd.DataFrame({"x": [1, 2, 3] * 5, "y": range(15)})
    
        if from_futures:
            parts = [df.iloc[:5], df.iloc[5:10], df.iloc[10:15]]
            futs = c.scatter(parts)
            ddf0 = dd.from_delayed(futs, meta=parts[0])
        else:
            ddf0 = dd.from_pandas(df, npartitions=3)
    
        if io == "parquet-pyarrow":
            pytest.importorskip("pyarrow")
            ddf0.to_parquet(str(tmpdir))
            ddf = dd.read_parquet(str(tmpdir))
        elif io == "parquet-fastparquet":
            pytest.importorskip("fastparquet")
            with pytest.warns(FutureWarning):
                ddf0.to_parquet(str(tmpdir), engine="fastparquet")
                ddf = dd.read_parquet(str(tmpdir), engine="fastparquet")
        elif io == "csv":
            ddf0.to_csv(str(tmpdir), index=False)
            ddf = dd.read_csv(os.path.join(str(tmpdir), "*"))
        elif io == "hdf":
            pytest.importorskip("tables")
            fn = str(tmpdir.join("h5"))
            ddf0.to_hdf(fn, "/data*")
            ddf = dd.read_hdf(fn, "/data*")
        else:
            raise AssertionError("unreachable")
    
        df = df[["x"]] + 10
        ddf = ddf[["x"]] + 10
        with dask.config.set({"optimization.fuse.active": fuse}):
            ddf.compute()
>           dsk = dask.dataframe.optimize(ddf.dask, ddf.__dask_keys__())
E           TypeError: 'module' object is not callable

dask/tests/test_distributed.py:509: TypeError