Skip to content

Commit

Permalink
add "JH_UniScheduler" batch_type (deepmodeling#459)
Browse files Browse the repository at this point in the history
JH UniScheduler was developed by JHINNO company. JH UniScheduler is a
commercial software and uses "jsub" to submit tasks. Its overall
architecture is similar to that of IBM's LSF. However, there are still
some differences between them.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

### Release Notes

- **New Features**
- Introduced support for a new scheduler system, `JH_UniScheduler`, in
the DPDispatcher Python package.
  - Added documentation and setup instructions for `JH_UniScheduler`.

- **Documentation**
- Updated descriptions in the documentation to include
`JH_UniScheduler`.
- Added detailed setup requirements and usage instructions for
`JH_UniScheduler`.

- **Testing**
- Added new test cases to ensure proper functionality of
`JH_UniScheduler`.
- Included configuration files and simulation input commands for testing
`JH_UniScheduler`.

- **Configuration**
- Updated project configuration to include `jh_unischeduler` as a
keyword.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
WangSimin123456 and pre-commit-ci[bot] committed May 26, 2024
1 parent b31c1ea commit 661882a
Show file tree
Hide file tree
Showing 19 changed files with 625 additions and 10 deletions.
20 changes: 14 additions & 6 deletions doc/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ To avoid running multiple jobs at the same time, one could set {dargs:argument}`
{dargs:argument}`batch_type <resources/batch_type>`: `Slurm`, `SlurmJobArray`

[Slurm](https://slurm.schedmd.com/) is a job scheduling system used by lots of HPCs.
One needs to make sure slurm has been setup in the remote server and the related environment is activated.
One needs to make sure slurm has been set up in the remote server and the related environment is activated.

When `SlurmJobArray` is used, dpdispatcher submits Slurm jobs with [job arrays](https://slurm.schedmd.com/job_array.html).
In this way, several dpdispatcher {class}`task <dpdispatcher.submission.Task>`s map to a Slurm job and a dpdispatcher {class}`job <dpdispatcher.submission.Job>` maps to a Slurm job array.
Expand All @@ -30,7 +30,7 @@ One can use {dargs:argument}`group_size <resources/group_size>` and {dargs:argum
{dargs:argument}`batch_type <resources/batch_type>`: `PBS`

[OpenPBS](https://www.openpbs.org/) is an open-source job scheduling of the Linux Foundation and [PBS Profession](https://www.altair.com/pbs-professional/) is its commercial solution.
One needs to make sure OpenPBS has been setup in the remote server and the related environment is activated.
One needs to make sure OpenPBS has been set up in the remote server and the related environment is activated.

Note that do not use `PBS` for Torque.

Expand All @@ -40,14 +40,22 @@ Note that do not use `PBS` for Torque.

The [Terascale Open-source Resource and QUEue Manager (TORQUE)](https://adaptivecomputing.com/cherry-services/torque-resource-manager/) is a distributed resource manager based on standard OpenPBS.
However, not all OpenPBS flags are still supported in TORQUE.
One needs to make sure TORQUE has been setup in the remote server and the related environment is activated.
One needs to make sure TORQUE has been set up in the remote server and the related environment is activated.

## LSF

{dargs:argument}`batch_type <resources/batch_type>`: `LSF`

[IBM Spectrum LSF Suites](https://www.ibm.com/products/hpc-workload-management) is a comprehensive workload management solution used by HPCs.
One needs to make sure LSF has been setup in the remote server and the related environment is activated.
One needs to make sure LSF has been set up in the remote server and the related environment is activated.

## JH UniScheduler

{dargs:argument}`batch_type <resources/batch_type>`: `JH_UniScheduler`

[JH UniScheduler](http://www.jhinno.com/m/custom_case_05.html) was developed by JHINNO company and uses "jsub" to submit tasks.
Its overall architecture is similar to that of IBM's LSF. However, there are still some differences between them. One needs to
make sure JH UniScheduler has been set up in the remote server and the related environment is activated.

## Bohrium

Expand All @@ -74,10 +82,10 @@ Read Fujitsu cloud service documentation for details.
## OpenAPI

{dargs:argument}`batcy_type <resources/batch_type>`: `OpenAPI`
OpenAPI is a new way to submit jobs to Bohrium. It using [AccessKey](https://bohrium.dp.tech/personal/setting) instead of username and password. Read Bohrium documentation for details.
OpenAPI is a new way to submit jobs to Bohrium. It is using [AccessKey](https://bohrium.dp.tech/personal/setting) instead of username and password. Read Bohrium documentation for details.

## SGE

{dargs:argument}`batch_type <resources/batch_type>`: `SGE`

The [Sun Grid Engine (SGE) scheduler](https://gridscheduler.sourceforge.net) is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot similarity with PBS except when checking job status. Use this argument if one is submitting job to SGE based batch system.
The [Sun Grid Engine (SGE) scheduler](https://gridscheduler.sourceforge.net) is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot of similarity with PBS except when checking job status. Use this argument if one is submitting job to an SGE-based batch system.
2 changes: 1 addition & 1 deletion doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
DPDispatcher's documentation
======================================

DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.
DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/JH_SCheduler/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.

DPDispatcher will monitor (poke) until these jobs finish and download the results files (if these jobs is running on remote systems connected by SSH).

Expand Down
175 changes: 175 additions & 0 deletions dpdispatcher/machines/JH_UniScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import shlex
from typing import List

from dargs import Argument

from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.utils import (
RetrySignal,
customized_script_header_template,
retry,
)

JH_UniScheduler_script_header_template = """\
#!/bin/bash -l
#JSUB -e %J.err
#JSUB -o %J.out
{JH_UniScheduler_nodes_line}
{JH_UniScheduler_ptile_line}
{JH_UniScheduler_partition_line}
{JH_UniScheduler_number_gpu_line}"""


class JH_UniScheduler(Machine):
"""JH_UniScheduler batch."""

def gen_script(self, job):
JH_UniScheduler_script = super().gen_script(job)
return JH_UniScheduler_script

def gen_script_header(self, job):
resources = job.resources
script_header_dict = {
"JH_UniScheduler_nodes_line": f"#JSUB -n {resources.number_node * resources.cpu_per_node}",
"JH_UniScheduler_ptile_line": f"#JSUB -R 'span[ptile={resources.cpu_per_node}]'",
"JH_UniScheduler_partition_line": f"#JSUB -q {resources.queue_name}",
}
custom_gpu_line = resources.kwargs.get("custom_gpu_line", None)
if not custom_gpu_line:
script_header_dict["JH_UniScheduler_number_gpu_line"] = (
"" f"#JSUB -gpgpu {resources.gpu_per_node}"
)
else:
script_header_dict["JH_UniScheduler_number_gpu_line"] = custom_gpu_line
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
JH_UniScheduler_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
JH_UniScheduler_script_header = (
JH_UniScheduler_script_header_template.format(**script_header_dict)
)

return JH_UniScheduler_script_header

@retry()
def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
self.context.write_file(fname=script_file_name, write_str=script_str)
script_run_str = self.gen_script_command(job)
script_run_file_name = f"{job.script_file_name}.run"
self.context.write_file(fname=script_run_file_name, write_str=script_run_str)

try:
stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(
shlex.quote(self.context.remote_root),
"jsub < ",
shlex.quote(script_file_name),
)
)
except RuntimeError as err:
raise RetrySignal(err) from err

subret = stdout.readlines()
job_id = subret[0].split()[1][1:-1]
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

@retry()
def check_status(self, job):
try:
job_id = job.job_id
except AttributeError:
return JobStatus.terminated
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("jjobs " + job_id)
err_str = stderr.read().decode("utf-8")
if (f"Job <{job_id}> is not found") in err_str:
if self.check_finish_tag(job):
return JobStatus.finished
else:
return JobStatus.terminated
elif ret != 0:
# just retry when any unknown error raised.
raise RetrySignal(
"Get error code %d in checking status through ssh with job: %s . message: %s"
% (ret, job.job_hash, err_str)
)
status_out = stdout.read().decode("utf-8").split("\n")
if len(status_out) < 2:
return JobStatus.unknown
else:
status_line = status_out[1]
status_word = status_line.split()[2]

if status_word in ["PEND"]:
return JobStatus.waiting
elif status_word in ["RUN", "PSUSP", "SSUSP", "USUSP"]:
return JobStatus.running
elif status_word in ["DONE", "EXIT"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown

def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)

@classmethod
def resources_subfields(cls) -> List[Argument]:
"""Generate the resources subfields.
Returns
-------
list[Argument]
resources subfields
"""
doc_custom_gpu_line = "Custom GPU configuration, starting with #JSUB"

return [
Argument(
"kwargs",
dict,
[
Argument(
"custom_gpu_line",
str,
optional=True,
default=None,
doc=doc_custom_gpu_line,
),
],
optional=False,
doc="Extra arguments.",
)
]

def kill(self, job):
"""Kill the job.
Parameters
----------
job : Job
job
"""
job_id = job.job_id
ret, stdin, stdout, stderr = self.context.block_call(
"jctrl kill " + str(job_id)
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
]
requires-python = ">=3.7"
readme = "README.md"
keywords = ["dispatcher", "hpc", "slurm", "lsf", "pbs", "ssh"]
keywords = ["dispatcher", "hpc", "slurm", "lsf", "pbs", "ssh", "jh_unischeduler"]

[project.urls]
Homepage = "https://github.com/deepmodeling/dpdispatcher"
Expand Down
1 change: 1 addition & 0 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dpdispatcher.machine import Machine # noqa: F401
from dpdispatcher.machines.distributed_shell import DistributedShell # noqa: F401
from dpdispatcher.machines.dp_cloud_server import Lebesgue # noqa: F401
from dpdispatcher.machines.JH_UniScheduler import JH_UniScheduler # noqa: F401
from dpdispatcher.machines.lsf import LSF # noqa: F401
from dpdispatcher.machines.pbs import PBS # noqa: F401
from dpdispatcher.machines.shell import Shell # noqa: F401
Expand Down
57 changes: 57 additions & 0 deletions tests/devel_test_JH_UniScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import json
import os
import sys

from dpdispatcher.machine import Machine
from dpdispatcher.submission import Resources, Submission, Task

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

# task_need_resources has no effect
with open("jsons/machine_jh_unischeduler.json") as f:
mdata = json.load(f)

machine = Machine.load_from_dict(mdata["machine"])
resources = Resources.load_from_dict(mdata["resources"])

submission = Submission(
work_base="0_md/",
machine=machine,
resources=resources,
forward_common_files=["graph.pb"],
backward_common_files=[],
)

task1 = Task(
command="lmp -i input.lammps",
task_work_path="bct-1/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
task2 = Task(
command="lmp -i input.lammps",
task_work_path="bct-2/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
task3 = Task(
command="lmp -i input.lammps",
task_work_path="bct-3/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
task4 = Task(
command="lmp -i input.lammps",
task_work_path="bct-4/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
submission.register_task_list(
[
task1,
task2,
task3,
task4,
]
)
submission.run_submission(clean=True)
16 changes: 16 additions & 0 deletions tests/jsons/machine_JH_UniScheduler.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"machine": {
"batch_type": "JH_UniScheduler",
"context_type": "local",
"local_root": "./",
"remote_root": "/data/home/wangsimin/machine_learning/dpgen/task/test/dpgen_example/run1"
},
"resources":{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 1,
"queue_name": "gpu",
"group_size": 4,
"source_list": ["/public/software/deepmd-kit/bin/activate /public/software/deepmd-kit"]
}
}
18 changes: 18 additions & 0 deletions tests/jsons/machine_lazy_local_jh_unischeduler.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"machine": {
"batch_type": "JH_UniScheduler",
"context_type": "LazyLocalContext",
"local_root": "./test_jh_unischeduler"
},
"resources": {
"number_node": 1,
"cpu_per_node": 4,
"queue_name": "gpu",
"gpu_per_node": 1,
"group_size": 4,
"strategy": {
"if_cuda_multi_devices": false
},
"source_list": ["./slurm_test.env"]
}
}
Loading

0 comments on commit 661882a

Please sign in to comment.