Skip to content

Commit

Permalink
Elevate warnings to errors in the test suite (#6094)
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Apr 12, 2022
1 parent 910a16e commit 3c3d4e1
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 68 deletions.
15 changes: 6 additions & 9 deletions distributed/comm/tests/test_comms.py
Expand Up @@ -2,7 +2,6 @@
import os
import sys
import threading
import warnings
from functools import partial

import pytest
Expand Down Expand Up @@ -38,10 +37,6 @@
)

EXTERNAL_IP4 = get_ip()
if has_ipv6():
with warnings.catch_warnings(record=True):
warnings.simplefilter("always")
EXTERNAL_IP6 = get_ipv6()


@pytest.fixture(params=["tornado", "asyncio"])
Expand Down Expand Up @@ -623,11 +618,12 @@ async def test_default_client_server_ipv4(tcp):
@requires_ipv6
@gen_test()
async def test_default_client_server_ipv6(tcp):
external_ip6 = get_ipv6()
await check_client_server("[::1]", tcp_eq("::1"))
await check_client_server("[::1]:3211", tcp_eq("::1", 3211))
await check_client_server("[::]", tcp_eq("::"), tcp_eq(EXTERNAL_IP6))
await check_client_server("[::]", tcp_eq("::"), tcp_eq(external_ip6))
await check_client_server(
"[::]:3212", tcp_eq("::", 3212), tcp_eq(EXTERNAL_IP6, 3212)
"[::]:3212", tcp_eq("::", 3212), tcp_eq(external_ip6, 3212)
)


Expand All @@ -648,11 +644,12 @@ async def test_tcp_client_server_ipv4(tcp):
@requires_ipv6
@gen_test()
async def test_tcp_client_server_ipv6(tcp):
external_ip6 = get_ipv6()
await check_client_server("tcp://[::1]", tcp_eq("::1"))
await check_client_server("tcp://[::1]:3231", tcp_eq("::1", 3231))
await check_client_server("tcp://[::]", tcp_eq("::"), tcp_eq(EXTERNAL_IP6))
await check_client_server("tcp://[::]", tcp_eq("::"), tcp_eq(external_ip6))
await check_client_server(
"tcp://[::]:3232", tcp_eq("::", 3232), tcp_eq(EXTERNAL_IP6, 3232)
"tcp://[::]:3232", tcp_eq("::", 3232), tcp_eq(external_ip6, 3232)
)


Expand Down
3 changes: 0 additions & 3 deletions distributed/comm/tests/test_ws.py
Expand Up @@ -187,9 +187,6 @@ async def test_quiet_close():
):
pass

# For some reason unrelated @coroutine warnings are showing up
record = [warning for warning in record if "coroutine" not in str(warning.message)]

assert not record, record[0].message


Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/cluster.py
Expand Up @@ -194,7 +194,7 @@ def close(self, timeout=None):
return self.sync(self._close, callback_timeout=timeout)

def __del__(self):
if self.status != Status.closed:
if getattr(self, "status", Status.closed) != Status.closed:
with suppress(AttributeError, RuntimeError): # during closing
self.loop.add_callback(self.close)

Expand Down
9 changes: 6 additions & 3 deletions distributed/deploy/tests/test_local.py
Expand Up @@ -353,7 +353,7 @@ async def test_worker_params():
memory_limit=500,
asynchronous=True,
) as c:
assert [w.memory_limit for w in c.workers.values()] == [500] * 2
assert [w.memory_manager.memory_limit for w in c.workers.values()] == [500] * 2


@gen_test()
Expand All @@ -368,7 +368,7 @@ async def test_memory_limit_none():
) as c:
w = c.workers[0]
assert type(w.data) is dict
assert w.memory_limit is None
assert w.memory_manager.memory_limit is None


def test_cleanup():
Expand Down Expand Up @@ -500,7 +500,10 @@ def test_memory(loop, n_workers):
dashboard_address=":0",
loop=loop,
) as cluster:
assert sum(w.memory_limit for w in cluster.workers.values()) <= MEMORY_LIMIT
assert (
sum(w.memory_manager.memory_limit for w in cluster.workers.values())
<= MEMORY_LIMIT
)


@pytest.mark.parametrize("n_workers", [None, 3])
Expand Down
43 changes: 24 additions & 19 deletions distributed/deploy/tests/test_ssh.py
Expand Up @@ -156,33 +156,38 @@ async def test_keywords():

@pytest.mark.avoid_ci
def test_defer_to_old(loop):
with pytest.warns(Warning):
with SSHCluster(
with pytest.warns(
UserWarning,
match=r"Note that the SSHCluster API has been replaced\. "
r"We're routing you to the older implementation\. "
r"This will be removed in the future",
):
c = SSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=7437,
worker_addrs=["127.0.0.1", "127.0.0.1"],
) as c:
from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster
)
with c:
from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster

assert isinstance(c, OldSSHCluster)
assert isinstance(c, OldSSHCluster)


@pytest.mark.avoid_ci
def test_old_ssh_with_local_dir(loop):
with pytest.warns(Warning):
from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster

with OldSSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=7437,
worker_addrs=["127.0.0.1", "127.0.0.1"],
local_directory="/tmp",
) as c:
assert len(c.workers) == 2
with Client(c) as client:
result = client.submit(lambda x: x + 1, 10)
result = result.result()
assert result == 11
from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster

with OldSSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=7437,
worker_addrs=["127.0.0.1", "127.0.0.1"],
local_directory="/tmp",
) as c:
assert len(c.workers) == 2
with Client(c) as client:
result = client.submit(lambda x: x + 1, 10)
result = result.result()
assert result == 11


@gen_test()
Expand Down
5 changes: 4 additions & 1 deletion distributed/diagnostics/tests/test_worker_plugin.py
@@ -1,4 +1,5 @@
import asyncio
import warnings

import pytest

Expand Down Expand Up @@ -264,12 +265,14 @@ async def test_assert_no_warning_no_overload(c, s, a):
class Dummy(WorkerPlugin):
pass

with pytest.warns(None):
with warnings.catch_warnings(record=True) as record:
await c.register_worker_plugin(Dummy())
assert await c.submit(inc, 1, key="x") == 2
while "x" in a.tasks:
await asyncio.sleep(0.01)

assert not record


@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
async def test_WorkerPlugin_overwrite(c, s, w):
Expand Down
3 changes: 1 addition & 2 deletions distributed/tests/test_actor.py
Expand Up @@ -73,7 +73,6 @@ async def test_client_actions(s, a, b, direct_to_workers):
assert counter._address
assert hasattr(counter, "increment")
assert hasattr(counter, "add")
assert hasattr(counter, "n")

assert await counter.n == 0

Expand Down Expand Up @@ -122,7 +121,7 @@ async def test_Actor(c, s, a, b):

assert counter._cls == Counter

assert hasattr(counter, "n")
assert await counter.n == 0
assert hasattr(counter, "increment")
assert hasattr(counter, "add")

Expand Down
25 changes: 11 additions & 14 deletions distributed/tests/test_client.py
Expand Up @@ -5399,11 +5399,11 @@ def test_quiet_quit_when_cluster_leaves(loop_in_thread):


def test_warn_executor(loop, s, a, b):
with warnings.catch_warnings(record=True) as record:
with Executor(s["address"], loop=loop) as c:
pass
with pytest.warns(UserWarning, match=r"Executor has been renamed to Client"):
c = Executor(s["address"], loop=loop)

assert any("Client" in str(r.message) for r in record)
with c:
pass


@gen_cluster([("127.0.0.1", 4)] * 2, client=True)
Expand Down Expand Up @@ -5652,23 +5652,20 @@ async def test_config_scheduler_address(s, a, b):

@gen_cluster(client=True)
async def test_warn_when_submitting_large_values(c, s, a, b):
with warnings.catch_warnings(record=True) as record:
with pytest.warns(
UserWarning,
match=r"Large object of size (2\.00 MB|1.91 MiB) detected in task graph:"
r" \n \(b'00000000000000000000000000000000000000000000000 \.\.\. 000000000000',\)"
r"\nConsider scattering large objects ahead of time.*",
):
future = c.submit(lambda x: x + 1, b"0" * 2000000)

text = str(record[0].message)
assert "2.00 MB" in text or "1.91 MiB" in text
assert "large" in text
assert "..." in text
assert "'000" in text
assert "000'" in text
assert len(text) < 2000

with warnings.catch_warnings(record=True) as record:
data = b"0" * 2000000
for i in range(10):
future = c.submit(lambda x, y: x, data, i)

assert len(record) < 2
assert not record


@gen_cluster(client=True)
Expand Down
5 changes: 3 additions & 2 deletions distributed/tests/test_utils.py
Expand Up @@ -7,6 +7,7 @@
import queue
import socket
import traceback
import warnings
from collections import deque
from time import sleep

Expand Down Expand Up @@ -478,12 +479,12 @@ async def throws(msg):


def test_warn_on_duration():
with pytest.warns(None) as record:
with warnings.catch_warnings(record=True) as record:
with warn_on_duration("10s", "foo"):
pass
assert not record

with pytest.warns(None) as record:
with pytest.warns(UserWarning, match=r"foo") as record:
with warn_on_duration("1ms", "foo"):
sleep(0.100)

Expand Down
8 changes: 4 additions & 4 deletions distributed/tests/test_worker.py
Expand Up @@ -567,13 +567,13 @@ async def test_memory_limit_auto(s):
async with Worker(s.address, nthreads=1) as a, Worker(
s.address, nthreads=2
) as b, Worker(s.address, nthreads=100) as c, Worker(s.address, nthreads=200) as d:
assert isinstance(a.memory_limit, Number)
assert isinstance(b.memory_limit, Number)
assert isinstance(a.memory_manager.memory_limit, Number)
assert isinstance(b.memory_manager.memory_limit, Number)

if CPU_COUNT > 1:
assert a.memory_limit < b.memory_limit
assert a.memory_manager.memory_limit < b.memory_manager.memory_limit

assert c.memory_limit == d.memory_limit
assert c.memory_manager.memory_limit == d.memory_manager.memory_limit


@gen_cluster(client=True)
Expand Down
14 changes: 7 additions & 7 deletions distributed/tests/test_worker_client.py
@@ -1,7 +1,6 @@
import asyncio
import random
import threading
import warnings
from collections import defaultdict
from time import sleep

Expand Down Expand Up @@ -216,12 +215,13 @@ async def test_local_client_warning(c, s, a, b):
from distributed import local_client

def func(x):
with warnings.catch_warnings(record=True) as record:
with local_client() as c:
x = c.submit(inc, x)
result = x.result()
assert any("worker_client" in str(r.message) for r in record)
return result
with pytest.warns(
UserWarning, match=r"local_client has moved to worker_client"
):
cmgr = local_client()

with cmgr as c:
return c.submit(inc, x).result()

future = c.submit(func, 10)
result = await future
Expand Down
39 changes: 36 additions & 3 deletions setup.cfg
Expand Up @@ -39,10 +39,43 @@ tag_prefix =
parentdir_prefix = distributed-

[tool:pytest]
addopts = -v -rsxfE --durations=20 --color=yes --ignore=continuous_integration --ignore=docs --ignore=.github
addopts = -v -rsxfE --durations=20 --color=yes --ignore=continuous_integration --ignore=docs --ignore=.github --strict-markers --strict-config
filterwarnings =
error:Since distributed.*:PendingDeprecationWarning
minversion = 4
error
ignore:Please use `dok_matrix` from the `scipy\.sparse` namespace, the `scipy\.sparse\.dok` namespace is deprecated.:DeprecationWarning
ignore:There is no current event loop:DeprecationWarning
ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning
ignore:unclosed <socket\.socket.*:ResourceWarning
ignore:unclosed context <zmq\.asyncio\.Context\(\).*:ResourceWarning
ignore:unclosed event loop <_(Unix|Windows)SelectorEventLoop.*:ResourceWarning
ignore:unclosed file <_io.BufferedWriter.*:ResourceWarning
ignore:unclosed file <_io.TextIOWrapper.*:ResourceWarning
ignore:unclosed transport <_SelectorSocketTransport.*:ResourceWarning
ignore:unclosed transport <asyncio\.sslproto\..*:ResourceWarning
ignore:Couldn't detect a suitable IP address for reaching '2001.4860.4860..8888', defaulting to hostname. \[Errno 65\] No route to host:RuntimeWarning
ignore:Dashboard and Scheduler are using the same server on port.*:RuntimeWarning
ignore:coroutine 'BaseTCPConnector.connect' was never awaited:RuntimeWarning
ignore:coroutine 'Client\._start' was never awaited:RuntimeWarning
ignore:coroutine 'ConnectionPool._connect' was never awaited:RuntimeWarning
ignore:coroutine 'PooledRPCCall\.__getattr__\.<locals>\.send_recv_from_rpc' was never awaited:RuntimeWarning
ignore:coroutine 'Scheduler\.restart' was never awaited:RuntimeWarning
ignore:coroutine 'Semaphore._refresh_leases' was never awaited:RuntimeWarning
ignore:overflow encountered in long_scalars:RuntimeWarning
ignore:Creating scratch directories is taking a surprisingly long time.*:UserWarning
ignore:Running on a single-machine scheduler when a distributed client is active might lead to unexpected results\.:UserWarning
ignore:Scheduler already contains a plugin with name nonidempotentplugin. overwriting:UserWarning
ignore:Increasing number of chunks by factor of 20:dask.array.core.PerformanceWarning
ignore::distributed.versions.VersionMismatchWarning
ignore:(?s)Exception in thread AsyncProcess Dask Worker process \(from Nanny\) watch process join.*assert exitcode is not None:pytest.PytestUnhandledThreadExceptionWarning
ignore:(?s)Exception in thread AsyncProcess SpawnProcess-\d+ watch process join.*assert exitcode is not None:pytest.PytestUnhandledThreadExceptionWarning
ignore:(?s)Exception in thread.*old_ssh.*channel\.send\(b"\\x03"\).*Socket is closed:pytest.PytestUnhandledThreadExceptionWarning
ignore:(?s)Exception in thread.*paramiko\.ssh_exception\.NoValidConnectionsError:pytest.PytestUnhandledThreadExceptionWarning
ignore:(?s)Exception ignored in. <Finalize object, dead>.*sem_unlink.*FileNotFoundError:pytest.PytestUnraisableExceptionWarning
ignore:(?s)Exception ignored in. <coroutine object Scheduler\.add_worker.*in handle_worker.*Event loop is closed:pytest.PytestUnraisableExceptionWarning
ignore:(?s)Exception ignored in. <function Client\.__del__.*RuntimeError. IOLoop is closed:pytest.PytestUnraisableExceptionWarning
ignore:notifyAll\(\) is deprecated, use notify_all\(\) instead:DeprecationWarning:paramiko
ignore:setDaemon\(\) is deprecated, set the daemon attribute instead:DeprecationWarning:paramiko
minversion = 6
markers =
ci1: marks tests as belonging to 1 out of 2 partitions to run on CI ('-m "not ci1"' for second partition)
slow: marks tests as slow (deselected by default; select with '--runslow')
Expand Down

0 comments on commit 3c3d4e1

Please sign in to comment.