Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of sleep calls in tests #63

Merged
merged 1 commit into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import time

from .utils import wait_for_job_state
from .conftest import HqEnv, print_table


Expand All @@ -11,7 +12,8 @@ def test_job_array_submit(hq_env: HqEnv):
hq_env.command(
["submit", "--array=30-36", "--", "bash", "-c", "echo $HQ_JOB_ID-$HQ_TASK_ID"]
)
time.sleep(0.4)
wait_for_job_state(hq_env, 1, "FINISHED")

for i in list(range(0, 30)) + list(range(37, 40)):
assert not os.path.isfile(os.path.join(hq_env.work_path, f"stdout.1.{i}"))
assert not os.path.isfile(os.path.join(hq_env.work_path, f"stderr.1.{i}"))
Expand Down Expand Up @@ -64,7 +66,8 @@ def test_job_array_error_some(hq_env: HqEnv):
]
)
hq_env.start_worker(cpus=2)
time.sleep(1)

wait_for_job_state(hq_env, 1, "FAILED")

table = hq_env.command(["jobs"], as_table=True)
assert table[1][2] == "FAILED"
Expand Down Expand Up @@ -99,7 +102,8 @@ def test_job_array_error_all(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["submit", "--array=0-9", "--", "/non-existent"])
hq_env.start_worker(cpus=2)
time.sleep(0.4)

wait_for_job_state(hq_env, 1, "FAILED")

table = hq_env.command(["jobs"], as_table=True)
assert table[1][2] == "FAILED"
Expand Down Expand Up @@ -169,7 +173,9 @@ def test_array_mix_with_simple_jobs(hq_env: HqEnv):
hq_env.command(["submit", "--array=1-4", "/bin/hostname"])
hq_env.command(["submit", "/bin/hostname"])
hq_env.start_workers(1, cpus=2)
time.sleep(1.6)

wait_for_job_state(hq_env, list(range(1, 101)), "FINISHED")

table = hq_env.command("jobs", as_table=True)
for i in range(1, 101):
assert table[i][0] == str(i)
Expand Down
18 changes: 7 additions & 11 deletions tests/test_cpus.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import subprocess
import time

import pytest

from .conftest import RUNNING_IN_CI, HqEnv, print_table
from .utils import wait_for_job_state


def read_list(filename):
Expand All @@ -13,7 +13,6 @@ def read_list(filename):


def test_job_num_of_cpus(hq_env: HqEnv):

hq_env.start_server()
hq_env.command(["submit", "--", "bash", "-c", "echo $HQ_CPUS"])
hq_env.command(
Expand All @@ -34,7 +33,8 @@ def test_job_num_of_cpus(hq_env: HqEnv):
hq_env.command(["submit", "--cpus", "all", "--", "bash", "-c", "echo $HQ_CPUS"])

hq_env.start_worker(cpus="3x4")
time.sleep(0.5)

wait_for_job_state(hq_env, [1, 2, 4, 5, 6], "FINISHED")

table = hq_env.command(["job", "1"], as_table=True)
assert table[4][0] == "Resources"
Expand Down Expand Up @@ -63,9 +63,8 @@ def test_job_num_of_cpus(hq_env: HqEnv):
assert list(range(12)) == lst


@pytest.mark.skipif(RUNNING_IN_CI, reason="Processes in CI is already prepinned")
@pytest.mark.skipif(RUNNING_IN_CI, reason="Processes in CI are already pre-pinned")
def test_manual_taskset(hq_env: HqEnv):

hq_env.start_server()
hq_env.command(
[
Expand All @@ -79,15 +78,13 @@ def test_manual_taskset(hq_env: HqEnv):
]
)
hq_env.start_worker(cpus=4)
time.sleep(1.5)

wait_for_job_state(hq_env, 1, "FINISHED")
table = hq_env.command(["job", "1"], as_table=True)
assert table[2][1] == "FINISHED"


# @pytest.mark.skipif(RUNNING_IN_CI, reason="Processes in CI is already prepinned")
def test_job_no_pin(hq_env: HqEnv):

pid = os.getpid()

process = subprocess.Popen(["taskset", "-p", str(pid)], stdout=subprocess.PIPE)
Expand All @@ -112,8 +109,8 @@ def test_job_no_pin(hq_env: HqEnv):
]
)
hq_env.start_worker(cpus=2)
time.sleep(0.4)

wait_for_job_state(hq_env, 1, "FINISHED")
table = hq_env.command(["job", "1"], as_table=True)
print_table(table)
assert table[2][1] == "FINISHED"
Expand All @@ -127,7 +124,6 @@ def test_job_no_pin(hq_env: HqEnv):


def test_job_pin(hq_env: HqEnv):

hq_env.start_server()
hq_env.command(
[
Expand All @@ -142,8 +138,8 @@ def test_job_pin(hq_env: HqEnv):
]
)
hq_env.start_worker(cpus=2)
time.sleep(0.4)

wait_for_job_state(hq_env, 1, "FINISHED")
table = hq_env.command(["job", "1"], as_table=True)
print_table(table)
assert table[2][1] == "FINISHED"
Expand Down
8 changes: 5 additions & 3 deletions tests/test_entries.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .conftest import HqEnv
import os
import time

from .conftest import HqEnv
from .utils import wait_for_job_state


def test_entries_no_newline(hq_env: HqEnv):
hq_env.start_server()
Expand All @@ -20,7 +22,7 @@ def test_entries_no_newline(hq_env: HqEnv):
"echo $HQ_ENTRY",
]
)
time.sleep(0.4)
wait_for_job_state(hq_env, 1, "FINISHED")

for i, test in enumerate(["One\n", "Two\n", "Three\n", "Four\n"]):
with open(f"stdout.1.{i}") as f:
Expand Down Expand Up @@ -49,7 +51,7 @@ def test_entries_with_newline(hq_env: HqEnv):
"echo $HQ_ENTRY",
]
)
time.sleep(0.4)
wait_for_job_state(hq_env, 1, "FINISHED")

for i, test in enumerate(["One\n", "Two\n", "Three\n", "Four\n"]):
with open(f"stdout.1.{i}") as f:
Expand Down
66 changes: 43 additions & 23 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,47 @@

import pytest

from .utils import wait_for_job_state
from .conftest import HqEnv


def test_job_submit(hq_env: HqEnv):
hq_env.start_server()
# table = hq_env.command("jobs")
# print(table)
table = hq_env.command("jobs", as_table=True)
assert len(table) == 1
assert table[0][:3] == ["Id", "Name", "State"]

hq_env.command(["submit", "--", "bash", "-c", "echo 'hello'"])
hq_env.command(["submit", "--", "bash", "-c", "echo 'hello2'"])
time.sleep(0.2)

wait_for_job_state(hq_env, [1, 2], "WAITING")

table = hq_env.command("jobs", as_table=True)
assert len(table) == 3
assert table[1][:3] == ["1", "bash", "WAITING"]
assert table[2][:3] == ["2", "bash", "WAITING"]

hq_env.start_worker(cpus=1)
time.sleep(0.3)

wait_for_job_state(hq_env, [1, 2], "FINISHED")

table = hq_env.command("jobs", as_table=True)
assert len(table) == 3
assert table[1][:3] == ["1", "bash", "FINISHED"]
assert table[2][:3] == ["2", "bash", "FINISHED"]

hq_env.command(["submit", "--", "sleep", "1"])
time.sleep(0.2)

wait_for_job_state(hq_env, 3, "RUNNING", sleep_s=0.2)

table = hq_env.command("jobs", as_table=True)
assert len(table) == 4
assert table[1][:3] == ["1", "bash", "FINISHED"]
assert table[2][:3] == ["2", "bash", "FINISHED"]
assert table[3][:3] == ["3", "sleep", "RUNNING"]

time.sleep(1.0)
wait_for_job_state(hq_env, 3, "FINISHED")

table = hq_env.command("jobs", as_table=True)
assert len(table) == 4
assert table[1][:3] == ["1", "bash", "FINISHED"]
Expand All @@ -52,7 +55,8 @@ def test_custom_name(hq_env: HqEnv, tmp_path):
hq_env.start_server()

hq_env.command(["submit", "sleep", "1", "--name=sleep_prog"])
time.sleep(0.2)
wait_for_job_state(hq_env, 1, "WAITING")

table = hq_env.command("jobs", as_table=True)
assert len(table) == 2
assert table[1][:3] == ["1", "sleep_prog", "WAITING"]
Expand Down Expand Up @@ -81,7 +85,9 @@ def test_job_output_default(hq_env: HqEnv, tmp_path):
hq_env.command(["submit", "--", "bash", "-c", "echo 'hello'"])
hq_env.command(["submit", "--", "ls", "/non-existent"])
hq_env.command(["submit", "--", "/non-existent-program"])
time.sleep(0.2)

wait_for_job_state(hq_env, [1, 2, 3], ["FINISHED", "FAILED"])

print(hq_env.command("jobs"))
with open(os.path.join(tmp_path, "stdout.1.0")) as f:
assert f.read() == "hello\n"
Expand All @@ -107,7 +113,8 @@ def test_job_output_configured(hq_env: HqEnv, tmp_path):
hq_env.command(
["submit", "--stdout=abc", "--stderr=xyz", "--", "bash", "-c", "echo 'hello'"]
)
time.sleep(0.2)
wait_for_job_state(hq_env, 1, "FINISHED")

print(hq_env.command("jobs"))
with open(os.path.join(tmp_path, "abc")) as f:
assert f.read() == "hello\n"
Expand All @@ -121,8 +128,8 @@ def test_job_output_none(hq_env: HqEnv, tmp_path):
hq_env.command(
["submit", "--stdout=none", "--stderr=none", "--", "bash", "-c", "echo 'hello'"]
)
time.sleep(0.2)
print(hq_env.command("jobs"))
wait_for_job_state(hq_env, 1, "FINISHED")

assert not os.path.exists(os.path.join(tmp_path, "none"))
assert not os.path.exists(os.path.join(tmp_path, "stdout.1.0"))
assert not os.path.exists(os.path.join(tmp_path, "stderr.1.0"))
Expand All @@ -138,7 +145,8 @@ def test_job_filters(hq_env: HqEnv):
hq_env.command(["submit", "--", "bash", "-c", "echo 'bye'"])
hq_env.command(["submit", "--", "ls", "failed"])

time.sleep(0.2)
wait_for_job_state(hq_env, [1, 2, 3], "WAITING")

r = hq_env.command(["cancel", "1"])
assert "Job 1 canceled" in r

Expand All @@ -155,25 +163,26 @@ def test_job_filters(hq_env: HqEnv):
assert len(table_waiting) == 3

hq_env.start_worker(cpus=1)
time.sleep(0.2)
hq_env.command(["submit", "--", "sleep", "1"])

print(hq_env.command(["jobs"], as_table=True))
wait_for_job_state(hq_env, 4, "RUNNING")

table_running = hq_env.command(["jobs", "running"], as_table=True)
assert len(table_running) == 2

table_finished = hq_env.command(["jobs", "finished"], as_table=True)
assert len(table_finished) == 2

table_failed = hq_env.command(["jobs", "failed"], as_table=True)
assert len(table_failed) == 2

table_running = hq_env.command(["jobs", "running"], as_table=True)
assert len(table_running) == 2


def test_job_fail(hq_env: HqEnv):
hq_env.start_server()
hq_env.start_worker(cpus=1)
hq_env.command(["submit", "--", "/non-existent-program"])
time.sleep(0.2)
wait_for_job_state(hq_env, 1, "FAILED")

table = hq_env.command("jobs", as_table=True)
assert len(table) == 2
assert table[1][:3] == ["1", "non-existent-program", "FAILED"]
Expand Down Expand Up @@ -209,7 +218,9 @@ def test_cancel_running(hq_env: HqEnv):
hq_env.start_server()
hq_env.start_worker(cpus=1)
hq_env.command(["submit", "sleep", "10"])
time.sleep(0.3)

wait_for_job_state(hq_env, 1, "RUNNING")

table = hq_env.command(["jobs"], as_table=True)
assert table[1][2] == "RUNNING"
r = hq_env.command(["cancel", "1"])
Expand All @@ -226,7 +237,9 @@ def test_cancel_finished(hq_env: HqEnv):
hq_env.start_worker(cpus=1)
hq_env.command(["submit", "hostname"])
hq_env.command(["submit", "/invalid"])
time.sleep(0.3)

wait_for_job_state(hq_env, [1, 2], ["FINISHED", "FAILED"])

r = hq_env.command(["cancel", "1"])
assert "Canceling job 1 failed" in r
r = hq_env.command(["cancel", "2"])
Expand All @@ -242,12 +255,16 @@ def test_reporting_state_after_worker_lost(hq_env: HqEnv):
hq_env.start_workers(2, cpus=1)
hq_env.command(["submit", "sleep", "1"])
hq_env.command(["submit", "sleep", "1"])
time.sleep(0.25)

wait_for_job_state(hq_env, [1, 2], "RUNNING")

table = hq_env.command(["jobs"], as_table=True)
assert table[1][2] == "RUNNING"
assert table[2][2] == "RUNNING"
hq_env.kill_worker(1)

time.sleep(0.25)

table = hq_env.command(["jobs"], as_table=True)
print(table)
if table[1][2] == "WAITING":
Expand All @@ -258,11 +275,14 @@ def test_reporting_state_after_worker_lost(hq_env: HqEnv):
assert 0
assert table[other][2] == "RUNNING"

time.sleep(1)
wait_for_job_state(hq_env, other, "FINISHED")

table = hq_env.command(["jobs"], as_table=True)
assert table[other][2] == "FINISHED"
assert table[idx][2] == "RUNNING"
time.sleep(1)

wait_for_job_state(hq_env, idx, "FINISHED")

table = hq_env.command(["jobs"], as_table=True)
assert table[other][2] == "FINISHED"
assert table[idx][2] == "FINISHED"