88 fail, 109 skipped, 3 763 pass in 9h 30m 51s
27 files 27 suites 9h 30m 51s ⏱️
3 960 tests 3 763 ✅ 109 💤 88 ❌
49 809 runs 46 634 ✅ 2 289 💤 886 ❌
Results for commit ce97cce.
Annotations
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_minimal_version (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53376', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
@gen_cluster(client=True)
async def test_minimal_version(c, s, a, b):
no_pyarrow_ctx = (
mock.patch.dict("sys.modules", {"pyarrow": None})
if pa is not None
else contextlib.nullcontext()
)
with no_pyarrow_ctx:
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with pytest.raises(
ModuleNotFoundError, match="requires pyarrow"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
> await c.compute(dd.merge(a, b, left_on="x", right_on="z"))
distributed\shuffle\tests\test_merge.py:72:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53388', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
> joined = hash_join(a, "y", b, "y", how)
distributed\shuffle\tests\test_merge.py:84:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53400', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
> joined = hash_join(a, "y", b, "y", how)
distributed\shuffle\tests\test_merge.py:84:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53412', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
> joined = hash_join(a, "y", b, "y", how)
distributed\shuffle\tests\test_merge.py:84:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53424', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
> joined = hash_join(a, "y", b, "y", how)
distributed\shuffle\tests\test_merge.py:84:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53436', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53437', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53440', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = dd.from_pandas(pdf1, npartitions=5)
ddf2 = dd.from_pandas(pdf2, npartitions=10)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = (
> ddf1.merge(ddf2, left_on="a", right_on="x")
# Vary the number of output partitions for the shuffles of dd2
.repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
)
distributed\shuffle\tests\test_merge.py:120:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53448', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53449', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53452', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = dd.from_pandas(pdf1, npartitions=5)
ddf2 = dd.from_pandas(pdf2, npartitions=10)
# This performs two shuffles:
# * ddf1 is shuffled on `a`
# * ddf2 is shuffled on `x`
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> ddf3 = ddf1.merge(
ddf2,
left_on="a",
right_on="x",
)
distributed\shuffle\tests\test_merge.py:146:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53460', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53477', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53494', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53511', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53528', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53545', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53562', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53580', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
res,
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
> joined = dd.merge(a, b, on="y", how=how)
distributed\shuffle\tests\test_merge.py:190:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53597', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53598', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53601', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
# hash join
list_eq(
await c.compute(
> dd.merge(
ddl,
ddr,
how=how,
left_on="a",
right_on="d",
)
),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed\shuffle\tests\test_merge.py:396:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53615', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53616', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53619', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
# hash join
list_eq(
await c.compute(
> dd.merge(
ddl,
ddr,
how=how,
left_on="a",
right_on="d",
)
),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed\shuffle\tests\test_merge.py:396:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53632', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53633', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53636', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
# hash join
list_eq(
await c.compute(
> dd.merge(
ddl,
ddr,
how=how,
left_on="a",
right_on="d",
)
),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed\shuffle\tests\test_merge.py:396:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53649', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53650', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53653', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pdr.join(pdl, how=how)
assert_eq(
await c.compute(ddr.join(ddl, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdl, pdr, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
expected = pd.merge(
pdr, pdl, how=how, left_index=True, right_index=True
)
assert_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_index=True,
right_index=True,
)
),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
# hash join
list_eq(
await c.compute(
> dd.merge(
ddl,
ddr,
how=how,
left_on="a",
right_on="d",
)
),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
distributed\shuffle\tests\test_merge.py:396:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53666', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53667', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53670', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed\shuffle\tests\test_merge.py:471:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53678', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53679', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53682', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed\shuffle\tests\test_merge.py:471:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53691', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53692', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53695', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed\shuffle\tests\test_merge.py:471:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53703', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53704', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53707', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed\shuffle\tests\test_merge.py:471:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 11 runs failed: test_merge_does_not_deadlock_if_worker_joins (distributed.shuffle.tests.test_merge)
artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53715', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53716', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
@mock.patch(
"distributed.shuffle._worker_plugin._ShuffleRunManager",
LimitedGetOrCreateShuffleRunManager,
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_merge_does_not_deadlock_if_worker_joins(c, s, a):
"""Regression test for https://github.com/dask/distributed/issues/8411"""
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
df1 = dd.from_pandas(pdf1, npartitions=10)
df2 = dd.from_pandas(pdf2, npartitions=20)
run_manager_A = a.plugins["shuffle"].shuffle_runs
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> joined = dd.merge(df1, df2, left_on="a", right_on="x")
distributed\shuffle\tests\test_merge.py:516:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None
def get(
key: str,
default: Any = no_default,
config: dict = config,
override_with: Any = None,
) -> Any:
"""
Get elements from global config
If ``override_with`` is not None this value will be passed straight back.
Useful for getting kwarg defaults from Dask config.
Use '.' for nested access
Examples
--------
>>> from dask import config
>>> config.get('foo') # doctest: +SKIP
{'x': 1, 'y': 2}
>>> config.get('foo.x') # doctest: +SKIP
1
>>> config.get('foo.x.y', default=123) # doctest: +SKIP
123
>>> config.get('foo.y', override_with=None) # doctest: +SKIP
2
>>> config.get('foo.y', override_with=3) # doctest: +SKIP
3
See Also
--------
dask.config.set
"""
if override_with is not None:
return override_with
keys = key.split(".")
result = config
for k in keys:
k = canonical_name(k, result)
try:
> result = result[k]
E KeyError: 'disk'
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
1 out of 13 runs failed: test_mixing_clients_same_scheduler (distributed.tests.test_client)
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: inc-64456584d8a2e7176e2cd177efaa15f2
s = <Scheduler 'tcp://127.0.0.1:52715', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:52716', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:52719', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster()
async def test_mixing_clients_same_scheduler(s, a, b):
async with Client(s.address, asynchronous=True) as c1, Client(
s.address, asynchronous=True
) as c2:
future = c1.submit(inc, 1)
> assert await c2.submit(inc, future) == 3
distributed\tests\test_client.py:6232:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: inc-64456584d8a2e7176e2cd177efaa15f2>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: inc-64456584d8a2e7176e2cd177efaa15f2
distributed\client.py:336: CancelledError