Skip to content

Commit

Permalink
Switch pytest-asyncio to use anyio (#780)
Browse files Browse the repository at this point in the history
* Switch pytest-asyncio to use anyio

* Remove uses of pytest-asyncio

* Remove unused config option
  • Loading branch information
jacobtomlinson committed Jul 26, 2023
1 parent 2c48b6e commit 19b48b8
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 68 deletions.
2 changes: 1 addition & 1 deletion dask_kubernetes/aiopykube/tests/test_pykube_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dask_kubernetes.aiopykube.objects import Pod


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_create_and_delete(docker_image, k8s_cluster):
api = HTTPClient(KubeConfig.from_env())
name = "test-" + uuid.uuid4().hex[:10]
Expand Down
4 changes: 2 additions & 2 deletions dask_kubernetes/aiopykube/tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from dask_kubernetes.aiopykube.objects import Pod


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_query(k8s_cluster):
api = HTTPClient(KubeConfig.from_env())
async for pod in Query(api, Pod, namespace="kube-system"):
assert isinstance(pod, Pod)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_objects(k8s_cluster):
api = HTTPClient(KubeConfig.from_env())
async for pod in Pod.objects(api).filter(namespace="kube-system"):
Expand Down
77 changes: 38 additions & 39 deletions dask_kubernetes/classic/tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pytest_asyncio
import asyncio
import base64
import getpass
Expand Down Expand Up @@ -68,36 +67,36 @@ def user_env():
cluster_kwargs = {"asynchronous": True}


@pytest_asyncio.fixture
@pytest.fixture
async def cluster(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, **cluster_kwargs) as cluster:
yield cluster


@pytest_asyncio.fixture
@pytest.fixture
async def remote_cluster(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, deploy_mode="remote", **cluster_kwargs) as cluster:
yield cluster


@pytest_asyncio.fixture
@pytest.fixture
async def client(cluster):
async with Client(cluster, asynchronous=True) as client:
yield client


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fixtures(client):
"""An initial test to get all the fixtures to run and check the cluster is usable."""
assert client


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_versions(client):
await client.get_versions(check=True)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_cluster_create(cluster):
cluster.scale(1)
await cluster
Expand All @@ -106,7 +105,7 @@ async def test_cluster_create(cluster):
assert result == 11


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_basic(cluster, client):
cluster.scale(2)
future = client.submit(lambda x: x + 1, 10)
Expand All @@ -122,7 +121,7 @@ async def test_basic(cluster, client):
assert all((await client.has_what()).values())


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_logs(remote_cluster):
cluster = remote_cluster
cluster.scale(2)
Expand All @@ -141,14 +140,14 @@ async def test_logs(remote_cluster):
)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_dask_worker_name_env_variable(k8s_cluster, pod_spec, user_env):
with dask.config.set({"kubernetes.name": "foo-{USER}-{uuid}"}):
async with KubeCluster(pod_spec, **cluster_kwargs) as cluster:
assert "foo-" + getpass.getuser() in cluster.name


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_diagnostics_link_env_variable(k8s_cluster, pod_spec, user_env):
pytest.importorskip("bokeh")
with dask.config.set({"distributed.dashboard.link": "foo-{USER}-{port}"}):
Expand All @@ -161,7 +160,7 @@ async def test_diagnostics_link_env_variable(k8s_cluster, pod_spec, user_env):


@pytest.mark.skip(reason="Cannot run two closers locally as loadbalancer ports collide")
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_namespace(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, **cluster_kwargs) as cluster:
assert "dask" in cluster.name
Expand All @@ -174,7 +173,7 @@ async def test_namespace(k8s_cluster, pod_spec):
await asyncio.sleep(0.1)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_adapt(cluster):
cluster.adapt()
async with Client(cluster, asynchronous=True) as client:
Expand All @@ -184,7 +183,7 @@ async def test_adapt(cluster):


@pytest.mark.xfail(reason="The widget has changed upstream")
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_ipython_display(cluster):
ipywidgets = pytest.importorskip("ipywidgets")
cluster.scale(1)
Expand All @@ -201,7 +200,7 @@ async def test_ipython_display(cluster):
await asyncio.sleep(0.5)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_env(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, env={"ABC": "DEF"}, **cluster_kwargs) as cluster:
cluster.scale(1)
Expand All @@ -212,7 +211,7 @@ async def test_env(k8s_cluster, pod_spec):
assert all(v["ABC"] == "DEF" for v in env.values())


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_from_yaml(k8s_cluster, docker_image):
test_yaml = {
"kind": "Pod",
Expand Down Expand Up @@ -254,7 +253,7 @@ async def test_pod_from_yaml(k8s_cluster, docker_image):
assert all((await client.has_what()).values())


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_expand_env_vars(k8s_cluster, docker_image):
try:
os.environ["FOO_IMAGE"] = docker_image
Expand Down Expand Up @@ -288,7 +287,7 @@ async def test_pod_expand_env_vars(k8s_cluster, docker_image):
del os.environ["FOO_IMAGE"]


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_template_dict(docker_image):
spec = {
"metadata": {},
Expand Down Expand Up @@ -330,7 +329,7 @@ async def test_pod_template_dict(docker_image):
assert all((await client.has_what()).values())


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_template_minimal_dict(k8s_cluster, docker_image):
spec = {
"spec": {
Expand Down Expand Up @@ -361,7 +360,7 @@ async def test_pod_template_minimal_dict(k8s_cluster, docker_image):
assert result == 11


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_template_from_conf(docker_image):
spec = {
"spec": {
Expand All @@ -377,7 +376,7 @@ async def test_pod_template_from_conf(docker_image):
)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_pod_template_with_custom_container_name(docker_image):
container_name = "my-custom-container"
spec = {"spec": {"containers": [{"name": container_name, "image": docker_image}]}}
Expand All @@ -387,7 +386,7 @@ async def test_pod_template_with_custom_container_name(docker_image):
assert cluster.pod_template.spec.containers[0].name == container_name


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_constructor_parameters(k8s_cluster, pod_spec):
env = {"FOO": "BAR", "A": 1}
async with KubeCluster(
Expand All @@ -405,7 +404,7 @@ async def test_constructor_parameters(k8s_cluster, pod_spec):
assert pod.metadata.generate_name == "myname"


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_reject_evicted_workers(cluster):
cluster.scale(1)
await cluster
Expand Down Expand Up @@ -440,7 +439,7 @@ async def test_reject_evicted_workers(cluster):
await asyncio.sleep(0.1)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_scale_up_down(cluster, client):
np = pytest.importorskip("numpy")
cluster.scale(2)
Expand Down Expand Up @@ -471,7 +470,7 @@ async def test_scale_up_down(cluster, client):
@pytest.mark.xfail(
reason="The delay between scaling up, starting a worker, and then scale down causes issues"
)
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_scale_up_down_fast(cluster, client):
cluster.scale(1)
await cluster
Expand Down Expand Up @@ -508,7 +507,7 @@ async def test_scale_up_down_fast(cluster, client):


@pytest.mark.xfail(reason="scaling has some unfortunate state")
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_scale_down_pending(cluster, client, cleanup_namespaces):
# Try to scale the cluster to use more pods than available
nodes = (await cluster.core_api.list_node()).items
Expand Down Expand Up @@ -571,7 +570,7 @@ def load_data(i):
assert time() < start + 60


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_automatic_startup(k8s_cluster, docker_image):
test_yaml = {
"kind": "Pod",
Expand Down Expand Up @@ -600,7 +599,7 @@ async def test_automatic_startup(k8s_cluster, docker_image):
assert cluster.pod_template.metadata.labels["foo"] == "bar"


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_repr(cluster):
for text in [repr(cluster), str(cluster)]:
assert "Box" not in text
Expand All @@ -610,7 +609,7 @@ async def test_repr(cluster):
)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_escape_username(k8s_cluster, pod_spec, monkeypatch):
monkeypatch.setenv("LOGNAME", "Foo!._")

Expand All @@ -622,13 +621,13 @@ async def test_escape_username(k8s_cluster, pod_spec, monkeypatch):
assert "foo" in cluster.pod_template.metadata.labels["user"]


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_escape_name(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, name="foo@bar", **cluster_kwargs) as cluster:
assert "@" not in str(cluster.pod_template)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_maximum(cluster):
with dask.config.set({"kubernetes.count.max": 1}):
with captured_logger("dask_kubernetes") as logger:
Expand Down Expand Up @@ -703,15 +702,15 @@ def test_default_toleration_preserved(docker_image):
} in tolerations


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_auth_missing(k8s_cluster, pod_spec):
with pytest.raises(kubernetes.config.ConfigException) as info:
await KubeCluster(pod_spec, auth=[], **cluster_kwargs)

assert "No authorization methods were provided" in str(info.value)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_auth_tries_all_methods(k8s_cluster, pod_spec):
fails = {"count": 0}

Expand All @@ -730,7 +729,7 @@ def load(self):
@pytest.mark.xfail(
reason="Updating the default client configuration is broken in kubernetes"
)
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_auth_kubeconfig_with_filename():
await KubeConfig(config_file=CONFIG_DEMO).load()

Expand All @@ -745,7 +744,7 @@ async def test_auth_kubeconfig_with_filename():
@pytest.mark.xfail(
reason="Updating the default client configuration is broken in kubernetes"
)
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_auth_kubeconfig_with_context():
await KubeConfig(config_file=CONFIG_DEMO, context="exp-scratch").load()

Expand All @@ -760,7 +759,7 @@ async def test_auth_kubeconfig_with_context():
@pytest.mark.xfail(
reason="Updating the default client configuration is broken in async kubernetes"
)
@pytest.mark.asyncio
@pytest.mark.anyio
async def test_auth_explicit():
await KubeAuth(
host="https://9.8.7.6", username="abc", password="some-password"
Expand All @@ -775,14 +774,14 @@ async def test_auth_explicit():
)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_start_with_workers(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, n_workers=2, **cluster_kwargs) as cluster:
async with Client(cluster, asynchronous=True) as client:
await client.wait_for_workers(2)


@pytest.mark.asyncio
@pytest.mark.anyio
@pytest.mark.xfail(reason="Flaky in CI and classic is deprecated anyway")
async def test_adapt_delete(cluster, ns):
"""
Expand Down Expand Up @@ -825,7 +824,7 @@ async def get_worker_pods():
assert len(cluster.scheduler_info["workers"]) == 2


@pytest.mark.asyncio
@pytest.mark.anyio
@pytest.mark.xfail(reason="Failing in CI with FileNotFoundError")
async def test_auto_refresh(cluster):
config = {
Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/common/tests/test_kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_config_detection(k8s_cluster):
assert b"pytest-kind" in check_output(["kubectl", "config", "current-context"])


@pytest.mark.asyncio
@pytest.mark.anyio
@pytest.mark.xfail(reason="Has asyncio issues on CI")
async def test_auth(k8s_cluster):
await ClusterAuth.load_first(ClusterAuth.DEFAULT)
Expand Down
5 changes: 5 additions & 0 deletions dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,8 @@ def run_generate(crd_path, patch_path, temp_path):
yield
k8s_cluster.kubectl("delete", "--wait=false", "-f", temp_dir.name)
temp_dir.cleanup()


@pytest.fixture
def anyio_backend():
return "asyncio"

0 comments on commit 19b48b8

Please sign in to comment.