Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add "JH_UniScheduler" batch_type #459

Merged
merged 25 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a37e043
add "JH_UniScheduler" batch_type
WangSimin123456 May 21, 2024
1aa07e4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 21, 2024
cf5439d
fix tests
WangSimin123456 May 23, 2024
a7f4a79
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 23, 2024
8adea81
fix test2
WangSimin123456 May 23, 2024
aa881fd
fix test3
WangSimin123456 May 23, 2024
3f577d0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 23, 2024
70f1ee6
fix test4
WangSimin123456 May 23, 2024
866f2a3
Update batch.md
WangSimin123456 May 23, 2024
3c23af7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 23, 2024
758d76c
Update machine_JH_UniScheduler.json
WangSimin123456 May 24, 2024
8b17dfb
Update machine_JH_UniScheduler.json
WangSimin123456 May 24, 2024
7841346
Merge branch 'master' of https://github.com/WangSimin123456/dpdispatcher
WangSimin123456 May 24, 2024
1f1d01e
fix test5
WangSimin123456 May 24, 2024
87b2d68
Update devel_test_JH_UniScheduler.py
WangSimin123456 May 24, 2024
9231bfc
Update test_JH_UniScheduler_script_generation.py
WangSimin123456 May 24, 2024
603e2de
Update test_JH_UniScheduler_script_generation.py
WangSimin123456 May 25, 2024
fc00abb
Update test_JH_UniScheduler_script_generation.py
WangSimin123456 May 25, 2024
400b6ca
Update test_JH_UniScheduler_script_generation.py
WangSimin123456 May 25, 2024
200b333
Update test_JH_UniScheduler_script_generation.py
WangSimin123456 May 25, 2024
7df95d6
Update test_JH_UniScheduler_script_generation.py
WangSimin123456 May 25, 2024
0146aa0
Update batch.md
WangSimin123456 May 25, 2024
1dec4f1
Update batch.md
WangSimin123456 May 25, 2024
590feb6
fix test6
WangSimin123456 May 25, 2024
1453890
Update test_class_machine_dispatch.py
WangSimin123456 May 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion doc/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ One needs to make sure TORQUE has been setup in the remote server and the relate
{dargs:argument}`batch_type <resources/batch_type>`: `LSF`

[IBM Spectrum LSF Suites](https://www.ibm.com/products/hpc-workload-management) is a comprehensive workload management solution used by HPCs.
One needs to make sure LSF has been setup in the remote server and the related environment is activated.
One needs to make sure LSF has been set up in the remote server and the related environment is activated.

## JH UniScheduler

{dargs:argument}`batch_type <resources/batch_type>`: `JH_UniScheduler`

[JH UniScheduler](http://www.jhinno.com/m/custom_case_05.html) was developed by China's JHINNO company and uses "jsub" to submit tasks.
Its overall architecture is similar to that of IBM's LSF. However, there are still some differences between them. One needs to
make sure JH UniScheduler has been set up in the remote server and the related environment is activated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the grammatical error in the setup terminology.

- make sure JH UniScheduler has been setup in the remote server
+ make sure JH UniScheduler has been set up in the remote server

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
make sure JH UniScheduler has been set up in the remote server and the related environment is activated.
make sure JH UniScheduler has been set up in the remote server and the related environment is activated.


## Bohrium

Expand Down
2 changes: 1 addition & 1 deletion doc/index.rst
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add documentation to doc/batch.md.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
DPDispatcher's documentation
======================================

DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.
DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/JH_SCheduler/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.

DPDispatcher will monitor (poke) until these jobs finish and download the results files (if these jobs is running on remote systems connected by SSH).

Expand Down
175 changes: 175 additions & 0 deletions dpdispatcher/machines/JH_UniScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import shlex
from typing import List

from dargs import Argument

from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.utils import (
RetrySignal,
customized_script_header_template,
retry,
)

JH_UniScheduler_script_header_template = """\
#!/bin/bash -l
#JSUB -e %J.err
#JSUB -o %J.out
{JH_UniScheduler_nodes_line}
{JH_UniScheduler_ptile_line}
{JH_UniScheduler_partition_line}
{JH_UniScheduler_number_gpu_line}"""


class JH_UniScheduler(Machine):
"""JH_UniScheduler batch."""

def gen_script(self, job):
JH_UniScheduler_script = super().gen_script(job)
return JH_UniScheduler_script

def gen_script_header(self, job):
resources = job.resources
script_header_dict = {
"JH_UniScheduler_nodes_line": f"#JSUB -n {resources.number_node * resources.cpu_per_node}",
"JH_UniScheduler_ptile_line": f"#JSUB -R 'span[ptile={resources.cpu_per_node}]'",
"JH_UniScheduler_partition_line": f"#JSUB -q {resources.queue_name}",
}
custom_gpu_line = resources.kwargs.get("custom_gpu_line", None)
if not custom_gpu_line:
script_header_dict["JH_UniScheduler_number_gpu_line"] = (
"" f"#JSUB -gpgpu {resources.gpu_per_node}"
)
else:
script_header_dict["JH_UniScheduler_number_gpu_line"] = custom_gpu_line
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
JH_UniScheduler_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
JH_UniScheduler_script_header = (
JH_UniScheduler_script_header_template.format(**script_header_dict)
)

return JH_UniScheduler_script_header

@retry()
def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
self.context.write_file(fname=script_file_name, write_str=script_str)
script_run_str = self.gen_script_command(job)
script_run_file_name = f"{job.script_file_name}.run"
self.context.write_file(fname=script_run_file_name, write_str=script_run_str)

try:
stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(
shlex.quote(self.context.remote_root),
"jsub < ",
shlex.quote(script_file_name),
)
)
except RuntimeError as err:
raise RetrySignal(err) from err

subret = stdout.readlines()
job_id = subret[0].split()[1][1:-1]
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

@retry()
def check_status(self, job):
try:
job_id = job.job_id
except AttributeError:
return JobStatus.terminated
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("jjobs " + job_id)
err_str = stderr.read().decode("utf-8")
if (f"Job <{job_id}> is not found") in err_str:
if self.check_finish_tag(job):
return JobStatus.finished
else:
return JobStatus.terminated
elif ret != 0:
# just retry when any unknown error raised.
raise RetrySignal(
"Get error code %d in checking status through ssh with job: %s . message: %s"
% (ret, job.job_hash, err_str)
)
status_out = stdout.read().decode("utf-8").split("\n")
if len(status_out) < 2:
return JobStatus.unknown
else:
status_line = status_out[1]
status_word = status_line.split()[2]

if status_word in ["PEND"]:
return JobStatus.waiting
elif status_word in ["RUN", "PSUSP", "SSUSP", "USUSP"]:
return JobStatus.running
elif status_word in ["DONE", "EXIT"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown

def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)

@classmethod
def resources_subfields(cls) -> List[Argument]:
"""Generate the resources subfields.

Returns
-------
list[Argument]
resources subfields
"""
doc_custom_gpu_line = "Custom GPU configuration, starting with #JSUB"

return [
Argument(
"kwargs",
dict,
[
Argument(
"custom_gpu_line",
str,
optional=True,
default=None,
doc=doc_custom_gpu_line,
),
],
optional=False,
doc="Extra arguments.",
)
]
njzjz marked this conversation as resolved.
Show resolved Hide resolved

def kill(self, job):
"""Kill the job.

Parameters
----------
job : Job
job
"""
job_id = job.job_id
ret, stdin, stdout, stderr = self.context.block_call(
"jctrl kill " + str(job_id)
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
]
requires-python = ">=3.7"
readme = "README.md"
keywords = ["dispatcher", "hpc", "slurm", "lsf", "pbs", "ssh"]
keywords = ["dispatcher", "hpc", "slurm", "lsf", "pbs", "ssh", "jh_unischeduler"]

[project.urls]
Homepage = "https://github.com/deepmodeling/dpdispatcher"
Expand Down
1 change: 1 addition & 0 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dpdispatcher.machine import Machine # noqa: F401
from dpdispatcher.machines.distributed_shell import DistributedShell # noqa: F401
from dpdispatcher.machines.dp_cloud_server import Lebesgue # noqa: F401
from dpdispatcher.machines.JH_UniScheduler import JH_UniScheduler # noqa: F401
from dpdispatcher.machines.lsf import LSF # noqa: F401
from dpdispatcher.machines.pbs import PBS # noqa: F401
from dpdispatcher.machines.shell import Shell # noqa: F401
Expand Down
57 changes: 57 additions & 0 deletions tests/devel_test_JH_UniScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import json
import os
import sys
Comment on lines +1 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The script output indicates that all the imports in tests/devel_test_JH_UniScheduler.py are unused. This includes:

  • json
  • os
  • sys
  • dpdispatcher.machine.Machine
  • dpdispatcher.submission.Resources
  • dpdispatcher.submission.Submission
  • dpdispatcher.submission.Task

These imports should be reviewed and removed if they are indeed not used in the code.

  • Unused import: import json
  • Unused import: import os
  • Unused import: import sys
  • Unused import: from dpdispatcher.machine import Machine
  • Unused import: from dpdispatcher.submission import Resources, Submission, Task
Analysis chain

Ensure all necessary imports are used and there are no unused imports.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for unused imports in the file.

# Test: Search for unused imports. Expect: No output if all imports are used.
rg --type python -e "import" -e "from" tests/devel_test_JH_UniScheduler.py | xargs -I {} ast-grep --lang python --pattern 'import $_' {}

Length of output: 169



Script:

#!/bin/bash
# Description: Check for unused imports in the file.

# List all import statements in the file
imports=$(rg --type py -o -N '^(import|from) .+' tests/devel_test_JH_UniScheduler.py)

# Check each import to see if it is used in the file
for import in $imports; do
  # Extract the module name from the import statement
  module=$(echo $import | sed -E 's/(import|from) ([^ ]+).*/\2/')
  
  # Search for usage of the module in the file
  usage=$(rg --type py -q $module tests/devel_test_JH_UniScheduler.py)
  
  # If the module is not used, print the import statement
  if [ -z "$usage" ]; then
    echo "Unused import: $import"
  fi
done

Length of output: 4165


from dpdispatcher.machine import Machine
from dpdispatcher.submission import Resources, Submission, Task

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

# task_need_resources has no effect
with open("jsons/machine_jh_unischeduler.json") as f:
mdata = json.load(f)

machine = Machine.load_from_dict(mdata["machine"])
resources = Resources.load_from_dict(mdata["resources"])

submission = Submission(
work_base="0_md/",
machine=machine,
resources=resources,
forward_common_files=["graph.pb"],
backward_common_files=[],
)

task1 = Task(
command="lmp -i input.lammps",
task_work_path="bct-1/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
task2 = Task(
command="lmp -i input.lammps",
task_work_path="bct-2/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
task3 = Task(
command="lmp -i input.lammps",
task_work_path="bct-3/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
task4 = Task(
command="lmp -i input.lammps",
task_work_path="bct-4/",
forward_files=["conf.lmp", "input.lammps"],
backward_files=["log.lammps"],
)
submission.register_task_list(
[
task1,
task2,
task3,
task4,
]
)
submission.run_submission(clean=True)
16 changes: 16 additions & 0 deletions tests/jsons/machine_JH_UniScheduler.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"machine": {
"batch_type": "JH_UniScheduler",
"context_type": "local",
"local_root": "./",
"remote_root": "/data/home/wangsimin/machine_learning/dpgen/task/test/dpgen_example/run1"
},
"resources":{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 1,
"queue_name": "gpu",
"group_size": 4,
"source_list": ["/public/software/deepmd-kit/bin/activate /public/software/deepmd-kit"]
}
}
14 changes: 14 additions & 0 deletions tests/jsons/machine_lazy_local_jh_unischeduler.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"machine": {
"batch_type": "JH_UniScheduler",
"context_type": "LazyLocalContext",
"local_root": "./test_jh_unischeduler"
},
"resources": {
"number_node": 1,
"cpu_per_node": 4,
"queue_name": "gpu",
"gpu_per_node": 1,
"group_size": 4
}
}
Loading
Loading