Skip to content

Commit

Permalink
Clean up parametrized tests. (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesteve committed May 16, 2020
1 parent 084b0b5 commit fe11e0b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 51 deletions.
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def test_basic():
HTCondorCluster,
MoabCluster,
OARCluster,
HTCondorCluster,
]


Expand Down
113 changes: 62 additions & 51 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,30 @@
SGECluster,
LSFCluster,
OARCluster,
HTCondorCluster,
)
from dask_jobqueue.core import Job
from dask_jobqueue.local import LocalCluster

from dask_jobqueue.sge import SGEJob

all_clusters = [
PBSCluster,
MoabCluster,
SLURMCluster,
SGECluster,
LSFCluster,
OARCluster,
HTCondorCluster,
]


def create_cluster_func(cluster_cls, **kwargs):
if cluster_cls is HTCondorCluster:
# HTCondorCluster has a mandatory disk argument
kwargs.setdefault("disk", "1GB")
return cluster_cls(**kwargs)


def test_errors():
match = re.compile("Job type.*job_cls", flags=re.DOTALL)
Expand Down Expand Up @@ -52,27 +70,28 @@ def test_command_template():
assert " --preload mymodule" in cluster._dummy_job._command_template


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_shebang_settings(Cluster):
if Cluster is HTCondorCluster:
pytest.skip(
"HTCondorCluster has a peculiar submit script and does not have a shebang"
)
default_shebang = "#!/usr/bin/env bash"
python_shebang = "#!/usr/bin/python"
with Cluster(cores=2, memory="4GB", shebang=python_shebang) as cluster:
with create_cluster_func(
Cluster, cores=2, memory="4GB", shebang=python_shebang
) as cluster:
job_script = cluster.job_script()
assert job_script.startswith(python_shebang)
assert "bash" not in job_script
with Cluster(cores=2, memory="4GB") as cluster:
with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster:
job_script = cluster.job_script()
assert job_script.startswith(default_shebang)


@pytest.mark.parametrize(
"Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster]
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_dashboard_link(Cluster):
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
assert re.match(r"http://\d+\.\d+\.\d+.\d+:\d+/status", cluster.dashboard_link)


Expand Down Expand Up @@ -110,7 +129,7 @@ def test_forward_ip():
def test_job_id_from_qsub_legacy(Cluster, qsub_return_string):
original_job_id = "654321"
qsub_return_string = qsub_return_string.format(job_id=original_job_id)
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string)


Expand All @@ -136,13 +155,13 @@ def test_job_id_from_qsub(job_cls, qsub_return_string):
@pytest.mark.parametrize("Cluster", [])
def test_job_id_error_handling_legacy(Cluster):
# non-matching regexp
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
with pytest.raises(ValueError, match="Could not parse job id"):
return_string = "there is no number here"
cluster._job_id_from_submit_output(return_string)

# no job_id named group in the regexp
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
with pytest.raises(ValueError, match="You need to use a 'job_id' named group"):
return_string = "Job <12345> submitted to <normal>."
cluster.job_id_regexp = r"(\d+)"
Expand Down Expand Up @@ -199,20 +218,17 @@ def test_jobqueue_cluster_call(tmpdir):
cluster._call([sys.executable, path_with_error.strpath])


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_cluster_has_cores_and_memory(Cluster):
base_regex = r"{}.+".format(Cluster.__name__)
with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='\d+GB'"):
Cluster()
create_cluster_func(Cluster)

with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='1GB'"):
Cluster(memory="1GB")
create_cluster_func(Cluster, memory="1GB")

with pytest.raises(ValueError, match=base_regex + r"cores=4, memory='\d+GB'"):
Cluster(cores=4)
create_cluster_func(Cluster, cores=4)


@pytest.mark.asyncio
Expand Down Expand Up @@ -256,54 +272,51 @@ def __init__(self, *args, **kwargs):
MyCluster(cores=1, memory="1GB")


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_default_number_of_worker_processes(Cluster):
with Cluster(cores=4, memory="4GB") as cluster:
with create_cluster_func(Cluster, cores=4, memory="4GB") as cluster:
assert " --nprocs 4" in cluster.job_script()
assert " --nthreads 1" in cluster.job_script()

with Cluster(cores=6, memory="4GB") as cluster:
with create_cluster_func(Cluster, cores=6, memory="4GB") as cluster:
assert " --nprocs 3" in cluster.job_script()
assert " --nthreads 2" in cluster.job_script()


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_scheduler_options(Cluster):
net_if_addrs = psutil.net_if_addrs()
interface = list(net_if_addrs.keys())[0]
port = 8804

with Cluster(
cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port}
with create_cluster_func(
Cluster,
cores=1,
memory="1GB",
scheduler_options={"interface": interface, "port": port},
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options["interface"] == interface
assert scheduler_options["port"] == port


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_scheduler_options_interface(Cluster):
net_if_addrs = psutil.net_if_addrs()
scheduler_interface = list(net_if_addrs.keys())[0]
worker_interface = "worker-interface"
scheduler_host = socket.gethostname()

with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster:
with create_cluster_func(
Cluster, cores=1, memory="1GB", interface=scheduler_interface
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
worker_options = cluster.new_spec["options"]
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == scheduler_interface

with Cluster(
with create_cluster_func(
Cluster,
cores=1,
memory="1GB",
interface=worker_interface,
Expand All @@ -314,7 +327,8 @@ def test_scheduler_options_interface(Cluster):
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == worker_interface

with Cluster(
with create_cluster_func(
Cluster,
cores=1,
memory="1GB",
interface=worker_interface,
Expand All @@ -326,29 +340,25 @@ def test_scheduler_options_interface(Cluster):
assert worker_options["interface"] == worker_interface


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster):
scheduler_host = socket.gethostname()
message_template = "pass {!r} through 'scheduler_options'"

message = message_template.format("host")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", host=scheduler_host):
with create_cluster_func(Cluster, cores=1, memory="1GB", host=scheduler_host):
pass

message = message_template.format("dashboard_address")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", dashboard_address=":8787"):
with create_cluster_func(
Cluster, cores=1, memory="1GB", dashboard_address=":8787"
):
pass


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_import_scheduler_options_from_config(Cluster):

net_if_addrs = psutil.net_if_addrs()
Expand All @@ -369,12 +379,13 @@ def test_import_scheduler_options_from_config(Cluster):
{"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options}
):

with Cluster(cores=2, memory="2GB") as cluster:
with create_cluster_func(Cluster, cores=2, memory="2GB") as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") == config_scheduler_interface
assert scheduler_options.get("port") == config_scheduler_port

with Cluster(
with create_cluster_func(
Cluster,
cores=2,
memory="2GB",
scheduler_options={"interface": pass_scheduler_interface},
Expand Down

0 comments on commit fe11e0b

Please sign in to comment.