Skip to content

Commit

Permalink
feat(submission): add async submission and unittest (#348)
Browse files Browse the repository at this point in the history
Feat #347 
1. add `check_interval`
2. add `async_run_submission` method
3. add unittest of the async func

Known bugs:
Async submission may fail on pbs/slurm queue system, but success on
lazylocal test.

---------

Signed-off-by: ixsluo <xiaoshan.luo@outlook.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
ixsluo and pre-commit-ci[bot] committed Jun 30, 2023
1 parent 3528930 commit abfe441
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 3 deletions.
46 changes: 44 additions & 2 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# %%
import asyncio
import copy
import functools
import json
import os
import pathlib
Expand Down Expand Up @@ -199,7 +201,9 @@ def bind_machine(self, machine):
self.local_root = machine.context.temp_local_root
return self

def run_submission(self, *, dry_run=False, exit_on_submit=False, clean=True):
def run_submission(
self, *, dry_run=False, exit_on_submit=False, clean=True, check_interval=30
):
"""Main method to execute the submission.
First, check whether old Submission exists on the remote machine, and try to recover from it.
Second, upload the local files to the remote machine where the tasks to be executed.
Expand Down Expand Up @@ -240,7 +244,7 @@ def run_submission(self, *, dry_run=False, exit_on_submit=False, clean=True):
break

try:
time.sleep(30)
time.sleep(check_interval)
except (Exception, KeyboardInterrupt, SystemExit) as e:
self.submission_to_json()
dlog.exception(e)
Expand All @@ -260,6 +264,44 @@ def run_submission(self, *, dry_run=False, exit_on_submit=False, clean=True):
self.clean_jobs()
return self.serialize()

async def async_run_submission(self, **kwargs):
"""Async interface of run_submission.
Examples
--------
>>> import asyncio
>>> from dpdispacher import Machine, Resource, Submission
>>> async def run_jobs():
... backgroud_task = set()
... # task1
... task1 = Task(...)
... submission1 = Submission(..., task_list=[task1])
... background_task = asyncio.create_task(
... submission1.async_run_submission(check_interval=2, clean=False)
... )
... # task2
... task2 = Task(...)
... submission2 = Submission(..., task_list=[task1])
... background_task = asyncio.create_task(
... submission2.async_run_submission(check_interval=2, clean=False)
... )
... background_tasks.add(background_task)
... result = await asyncio.gather(*background_tasks)
... return result
>>> run_jobs()
May raise Error if pass `clean=True` explicitly when submit to pbs or slurm.
"""
kwargs = {**{"clean": False}, **kwargs}
if kwargs["clean"]:
dlog.warning(
"Using async submission with `clean=True`, "
"job may fail in queue system"
)
loop = asyncio.get_event_loop()
wrapped_submission = functools.partial(self.run_submission, **kwargs)
return await loop.run_in_executor(None, wrapped_submission)

def update_submission_state(self):
"""Check whether all the jobs in the submission.
Expand Down
64 changes: 63 additions & 1 deletion tests/test_run_submission.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import os
import random
import shutil
import sys

Expand Down Expand Up @@ -85,7 +87,51 @@ def test_run_submission(self):
backward_common_files=[],
task_list=task_list,
)
submission.run_submission()
submission.run_submission(check_interval=2)

for ii in range(4):
self.assertTrue(
os.path.isfile(
os.path.join(
self.machine_dict["local_root"], "test_dir/", f"out{ii}.txt"
)
)
)

def test_async_run_submission(self):
machine = Machine.load_from_dict(self.machine_dict)
resources = Resources.load_from_dict(self.resources_dict)
ntask = 4

async def run_jobs(ntask):
background_tasks = set()
for ii in range(ntask):
sleep_time = random.random() * 5 + 2
task = Task(
command=f"echo dpdispatcher_unittest_{ii} && sleep {sleep_time}",
task_work_path="./",
forward_files=[],
backward_files=[f"out{ii}.txt"],
outlog=f"out{ii}.txt",
)
submission = Submission(
work_base="test_dir/",
machine=machine,
resources=resources,
forward_common_files=[],
backward_common_files=[],
task_list=[task],
)
background_task = asyncio.create_task(
submission.async_run_submission(check_interval=2, clean=False)
)
background_tasks.add(background_task)
# background_task.add_done_callback(background_tasks.discard)
res = await asyncio.gather(*background_tasks)
return res

res = asyncio.run(run_jobs(ntask=ntask))
print(res)

for ii in range(4):
self.assertTrue(
Expand All @@ -110,6 +156,10 @@ def setUp(self):
self.machine_dict["batch_type"] = "Slurm"
self.resources_dict["queue_name"] = "normal"

@unittest.skip("Manaually skip") # comment this line to open unittest
def test_async_run_submission(self):
return super().test_async_run_submission()


@unittest.skipIf(
os.environ.get("DPDISPATCHER_TEST") != "slurm",
Expand All @@ -121,6 +171,10 @@ def setUp(self):
self.machine_dict["batch_type"] = "SlurmJobArray"
self.resources_dict["queue_name"] = "normal"

@unittest.skip("Manaually skip") # comment this line to open unittest
def test_async_run_submission(self):
return super().test_async_run_submission()


@unittest.skipIf(
os.environ.get("DPDISPATCHER_TEST") != "slurm",
Expand All @@ -133,6 +187,10 @@ def setUp(self):
self.resources_dict["queue_name"] = "normal"
self.resources_dict["kwargs"] = {"slurm_job_size": 2}

@unittest.skip("Manaually skip") # comment this line to open unittest
def test_async_run_submission(self):
return super().test_async_run_submission()


@unittest.skipIf(
os.environ.get("DPDISPATCHER_TEST") != "pbs", "outside the pbs testing environment"
Expand All @@ -143,6 +201,10 @@ def setUp(self):
self.machine_dict["batch_type"] = "PBS"
self.resources_dict["queue_name"] = "workq"

@unittest.skip("Manaually skip") # comment this line to open unittest
def test_async_run_submission(self):
return super().test_async_run_submission()


@unittest.skipIf(sys.platform == "win32", "Shell is not supported on Windows")
class TestLazyLocalContext(RunSubmission, unittest.TestCase):
Expand Down

0 comments on commit abfe441

Please sign in to comment.