-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add "JH_UniScheduler" batch_type #459
Changes from all commits
a37e043
1aa07e4
cf5439d
a7f4a79
8adea81
aa881fd
3f577d0
70f1ee6
866f2a3
3c23af7
758d76c
8b17dfb
7841346
1f1d01e
87b2d68
9231bfc
603e2de
fc00abb
400b6ca
200b333
7df95d6
0146aa0
1dec4f1
590feb6
1453890
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,7 +18,7 @@ To avoid running multiple jobs at the same time, one could set {dargs:argument}` | |||||
{dargs:argument}`batch_type <resources/batch_type>`: `Slurm`, `SlurmJobArray` | ||||||
|
||||||
[Slurm](https://slurm.schedmd.com/) is a job scheduling system used by lots of HPCs. | ||||||
One needs to make sure slurm has been setup in the remote server and the related environment is activated. | ||||||
One needs to make sure slurm has been set up in the remote server and the related environment is activated. | ||||||
|
||||||
When `SlurmJobArray` is used, dpdispatcher submits Slurm jobs with [job arrays](https://slurm.schedmd.com/job_array.html). | ||||||
In this way, several dpdispatcher {class}`task <dpdispatcher.submission.Task>`s map to a Slurm job and a dpdispatcher {class}`job <dpdispatcher.submission.Job>` maps to a Slurm job array. | ||||||
|
@@ -30,7 +30,7 @@ One can use {dargs:argument}`group_size <resources/group_size>` and {dargs:argum | |||||
{dargs:argument}`batch_type <resources/batch_type>`: `PBS` | ||||||
|
||||||
[OpenPBS](https://www.openpbs.org/) is an open-source job scheduling of the Linux Foundation and [PBS Profession](https://www.altair.com/pbs-professional/) is its commercial solution. | ||||||
One needs to make sure OpenPBS has been setup in the remote server and the related environment is activated. | ||||||
One needs to make sure OpenPBS has been set up in the remote server and the related environment is activated. | ||||||
|
||||||
Note that do not use `PBS` for Torque. | ||||||
|
||||||
|
@@ -40,14 +40,22 @@ Note that do not use `PBS` for Torque. | |||||
|
||||||
The [Terascale Open-source Resource and QUEue Manager (TORQUE)](https://adaptivecomputing.com/cherry-services/torque-resource-manager/) is a distributed resource manager based on standard OpenPBS. | ||||||
However, not all OpenPBS flags are still supported in TORQUE. | ||||||
One needs to make sure TORQUE has been setup in the remote server and the related environment is activated. | ||||||
One needs to make sure TORQUE has been set up in the remote server and the related environment is activated. | ||||||
|
||||||
## LSF | ||||||
|
||||||
{dargs:argument}`batch_type <resources/batch_type>`: `LSF` | ||||||
|
||||||
[IBM Spectrum LSF Suites](https://www.ibm.com/products/hpc-workload-management) is a comprehensive workload management solution used by HPCs. | ||||||
One needs to make sure LSF has been setup in the remote server and the related environment is activated. | ||||||
One needs to make sure LSF has been set up in the remote server and the related environment is activated. | ||||||
|
||||||
## JH UniScheduler | ||||||
|
||||||
{dargs:argument}`batch_type <resources/batch_type>`: `JH_UniScheduler` | ||||||
|
||||||
[JH UniScheduler](http://www.jhinno.com/m/custom_case_05.html) was developed by JHINNO company and uses "jsub" to submit tasks. | ||||||
Its overall architecture is similar to that of IBM's LSF. However, there are still some differences between them. One needs to | ||||||
make sure JH UniScheduler has been set up in the remote server and the related environment is activated. | ||||||
|
||||||
## Bohrium | ||||||
|
||||||
|
@@ -74,10 +82,10 @@ Read Fujitsu cloud service documentation for details. | |||||
## OpenAPI | ||||||
|
||||||
{dargs:argument}`batcy_type <resources/batch_type>`: `OpenAPI` | ||||||
OpenAPI is a new way to submit jobs to Bohrium. It using [AccessKey](https://bohrium.dp.tech/personal/setting) instead of username and password. Read Bohrium documentation for details. | ||||||
OpenAPI is a new way to submit jobs to Bohrium. It is using [AccessKey](https://bohrium.dp.tech/personal/setting) instead of username and password. Read Bohrium documentation for details. | ||||||
|
||||||
## SGE | ||||||
|
||||||
{dargs:argument}`batch_type <resources/batch_type>`: `SGE` | ||||||
|
||||||
The [Sun Grid Engine (SGE) scheduler](https://gridscheduler.sourceforge.net) is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot similarity with PBS except when checking job status. Use this argument if one is submitting job to SGE based batch system. | ||||||
The [Sun Grid Engine (SGE) scheduler](https://gridscheduler.sourceforge.net) is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot of similarity with PBS except when checking job status. Use this argument if one is submitting job to an SGE-based batch system. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add missing prepositions, commas, and articles for grammatical correctness. - The Sun Grid Engine (SGE) scheduler is a batch-queueing system distributed resource management.
+ The Sun Grid Engine (SGE) scheduler is a batch-queueing system, distributed as resource management.
- The commands and flags of SGE share a lot of similarity with PBS except when checking job status.
+ The commands and flags of SGE share a lot of similarity with PBS, except when checking job status.
- Use this argument if one is submitting job to an SGE-based batch system.
+ Use this argument if one is submitting a job to an SGE-based batch system. Committable suggestion
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
import shlex | ||
from typing import List | ||
|
||
from dargs import Argument | ||
|
||
from dpdispatcher.dlog import dlog | ||
from dpdispatcher.machine import Machine | ||
from dpdispatcher.utils.job_status import JobStatus | ||
from dpdispatcher.utils.utils import ( | ||
RetrySignal, | ||
customized_script_header_template, | ||
retry, | ||
) | ||
|
||
JH_UniScheduler_script_header_template = """\ | ||
#!/bin/bash -l | ||
#JSUB -e %J.err | ||
#JSUB -o %J.out | ||
{JH_UniScheduler_nodes_line} | ||
{JH_UniScheduler_ptile_line} | ||
{JH_UniScheduler_partition_line} | ||
{JH_UniScheduler_number_gpu_line}""" | ||
|
||
|
||
class JH_UniScheduler(Machine): | ||
"""JH_UniScheduler batch.""" | ||
|
||
def gen_script(self, job): | ||
JH_UniScheduler_script = super().gen_script(job) | ||
return JH_UniScheduler_script | ||
|
||
def gen_script_header(self, job): | ||
resources = job.resources | ||
script_header_dict = { | ||
"JH_UniScheduler_nodes_line": f"#JSUB -n {resources.number_node * resources.cpu_per_node}", | ||
"JH_UniScheduler_ptile_line": f"#JSUB -R 'span[ptile={resources.cpu_per_node}]'", | ||
"JH_UniScheduler_partition_line": f"#JSUB -q {resources.queue_name}", | ||
} | ||
custom_gpu_line = resources.kwargs.get("custom_gpu_line", None) | ||
if not custom_gpu_line: | ||
script_header_dict["JH_UniScheduler_number_gpu_line"] = ( | ||
"" f"#JSUB -gpgpu {resources.gpu_per_node}" | ||
) | ||
else: | ||
script_header_dict["JH_UniScheduler_number_gpu_line"] = custom_gpu_line | ||
if ( | ||
resources["strategy"].get("customized_script_header_template_file") | ||
is not None | ||
): | ||
JH_UniScheduler_script_header = customized_script_header_template( | ||
resources["strategy"]["customized_script_header_template_file"], | ||
resources, | ||
) | ||
else: | ||
JH_UniScheduler_script_header = ( | ||
JH_UniScheduler_script_header_template.format(**script_header_dict) | ||
) | ||
|
||
return JH_UniScheduler_script_header | ||
|
||
@retry() | ||
def do_submit(self, job): | ||
script_file_name = job.script_file_name | ||
script_str = self.gen_script(job) | ||
job_id_name = job.job_hash + "_job_id" | ||
self.context.write_file(fname=script_file_name, write_str=script_str) | ||
script_run_str = self.gen_script_command(job) | ||
script_run_file_name = f"{job.script_file_name}.run" | ||
self.context.write_file(fname=script_run_file_name, write_str=script_run_str) | ||
|
||
try: | ||
stdin, stdout, stderr = self.context.block_checkcall( | ||
"cd {} && {} {}".format( | ||
shlex.quote(self.context.remote_root), | ||
"jsub < ", | ||
shlex.quote(script_file_name), | ||
) | ||
) | ||
except RuntimeError as err: | ||
raise RetrySignal(err) from err | ||
|
||
subret = stdout.readlines() | ||
job_id = subret[0].split()[1][1:-1] | ||
self.context.write_file(job_id_name, job_id) | ||
return job_id | ||
|
||
def default_resources(self, resources): | ||
pass | ||
|
||
@retry() | ||
def check_status(self, job): | ||
try: | ||
job_id = job.job_id | ||
except AttributeError: | ||
return JobStatus.terminated | ||
if job_id == "": | ||
return JobStatus.unsubmitted | ||
ret, stdin, stdout, stderr = self.context.block_call("jjobs " + job_id) | ||
err_str = stderr.read().decode("utf-8") | ||
if (f"Job <{job_id}> is not found") in err_str: | ||
if self.check_finish_tag(job): | ||
return JobStatus.finished | ||
else: | ||
return JobStatus.terminated | ||
elif ret != 0: | ||
# just retry when any unknown error raised. | ||
raise RetrySignal( | ||
"Get error code %d in checking status through ssh with job: %s . message: %s" | ||
% (ret, job.job_hash, err_str) | ||
) | ||
status_out = stdout.read().decode("utf-8").split("\n") | ||
if len(status_out) < 2: | ||
return JobStatus.unknown | ||
else: | ||
status_line = status_out[1] | ||
status_word = status_line.split()[2] | ||
|
||
if status_word in ["PEND"]: | ||
return JobStatus.waiting | ||
elif status_word in ["RUN", "PSUSP", "SSUSP", "USUSP"]: | ||
return JobStatus.running | ||
elif status_word in ["DONE", "EXIT"]: | ||
if self.check_finish_tag(job): | ||
dlog.info(f"job: {job.job_hash} {job.job_id} finished") | ||
return JobStatus.finished | ||
else: | ||
return JobStatus.terminated | ||
else: | ||
return JobStatus.unknown | ||
|
||
def check_finish_tag(self, job): | ||
job_tag_finished = job.job_hash + "_job_tag_finished" | ||
return self.context.check_file_exists(job_tag_finished) | ||
|
||
@classmethod | ||
def resources_subfields(cls) -> List[Argument]: | ||
"""Generate the resources subfields. | ||
|
||
Returns | ||
------- | ||
list[Argument] | ||
resources subfields | ||
""" | ||
doc_custom_gpu_line = "Custom GPU configuration, starting with #JSUB" | ||
|
||
return [ | ||
Argument( | ||
"kwargs", | ||
dict, | ||
[ | ||
Argument( | ||
"custom_gpu_line", | ||
str, | ||
optional=True, | ||
default=None, | ||
doc=doc_custom_gpu_line, | ||
), | ||
], | ||
optional=False, | ||
doc="Extra arguments.", | ||
) | ||
] | ||
njzjz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def kill(self, job): | ||
"""Kill the job. | ||
|
||
Parameters | ||
---------- | ||
job : Job | ||
job | ||
""" | ||
job_id = job.job_id | ||
ret, stdin, stdout, stderr = self.context.block_call( | ||
"jctrl kill " + str(job_id) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import json | ||
import os | ||
import sys | ||
Comment on lines
+1
to
+3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification The script output indicates that all the imports in
These imports should be reviewed and removed if they are indeed not used in the code.
Analysis chainEnsure all necessary imports are used and there are no unused imports. Scripts ExecutedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check for unused imports in the file.
# Test: Search for unused imports. Expect: No output if all imports are used.
rg --type python -e "import" -e "from" tests/devel_test_JH_UniScheduler.py | xargs -I {} ast-grep --lang python --pattern 'import $_' {}
Length of output: 169 Script: #!/bin/bash
# Description: Check for unused imports in the file.
# List all import statements in the file
imports=$(rg --type py -o -N '^(import|from) .+' tests/devel_test_JH_UniScheduler.py)
# Check each import to see if it is used in the file
for import in $imports; do
# Extract the module name from the import statement
module=$(echo $import | sed -E 's/(import|from) ([^ ]+).*/\2/')
# Search for usage of the module in the file
usage=$(rg --type py -q $module tests/devel_test_JH_UniScheduler.py)
# If the module is not used, print the import statement
if [ -z "$usage" ]; then
echo "Unused import: $import"
fi
done
Length of output: 4165 |
||
|
||
from dpdispatcher.machine import Machine | ||
from dpdispatcher.submission import Resources, Submission, Task | ||
|
||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | ||
|
||
# task_need_resources has no effect | ||
with open("jsons/machine_jh_unischeduler.json") as f: | ||
mdata = json.load(f) | ||
|
||
machine = Machine.load_from_dict(mdata["machine"]) | ||
resources = Resources.load_from_dict(mdata["resources"]) | ||
|
||
submission = Submission( | ||
work_base="0_md/", | ||
machine=machine, | ||
resources=resources, | ||
forward_common_files=["graph.pb"], | ||
backward_common_files=[], | ||
) | ||
|
||
task1 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-1/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
task2 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-2/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
task3 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-3/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
task4 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-4/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
submission.register_task_list( | ||
[ | ||
task1, | ||
task2, | ||
task3, | ||
task4, | ||
] | ||
) | ||
submission.run_submission(clean=True) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
{ | ||
"machine": { | ||
"batch_type": "JH_UniScheduler", | ||
"context_type": "local", | ||
"local_root": "./", | ||
"remote_root": "/data/home/wangsimin/machine_learning/dpgen/task/test/dpgen_example/run1" | ||
}, | ||
"resources":{ | ||
"number_node": 1, | ||
"cpu_per_node": 4, | ||
"gpu_per_node": 1, | ||
"queue_name": "gpu", | ||
"group_size": 4, | ||
"source_list": ["/public/software/deepmd-kit/bin/activate /public/software/deepmd-kit"] | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"machine": { | ||
"batch_type": "JH_UniScheduler", | ||
"context_type": "LazyLocalContext", | ||
"local_root": "./test_jh_unischeduler" | ||
}, | ||
"resources": { | ||
"number_node": 1, | ||
"cpu_per_node": 4, | ||
"queue_name": "gpu", | ||
"gpu_per_node": 1, | ||
"group_size": 4, | ||
"strategy": { | ||
"if_cuda_multi_devices": false | ||
}, | ||
"source_list": ["./slurm_test.env"] | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the grammatical error in the setup terminology.
Committable suggestion