-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add "JH_UniScheduler" batch_type #459
Changes from 2 commits
a37e043
1aa07e4
cf5439d
a7f4a79
8adea81
aa881fd
3f577d0
70f1ee6
866f2a3
3c23af7
758d76c
8b17dfb
7841346
1f1d01e
87b2d68
9231bfc
603e2de
fc00abb
400b6ca
200b333
7df95d6
0146aa0
1dec4f1
590feb6
1453890
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
import shlex | ||
from typing import List | ||
|
||
from dargs import Argument | ||
|
||
from dpdispatcher.dlog import dlog | ||
from dpdispatcher.machine import Machine | ||
from dpdispatcher.utils.job_status import JobStatus | ||
from dpdispatcher.utils.utils import ( | ||
RetrySignal, | ||
customized_script_header_template, | ||
retry, | ||
) | ||
|
||
JH_UniScheduler_script_header_template = """\ | ||
#!/bin/bash -l | ||
#JSUB -e %J.err | ||
#JSUB -o %J.out | ||
{JH_UniScheduler_nodes_line} | ||
{JH_UniScheduler_ptile_line} | ||
{JH_UniScheduler_partition_line} | ||
{JH_UniScheduler_number_gpu_line}""" | ||
|
||
|
||
class JH_UniScheduler(Machine): | ||
"""JH_UniScheduler batch.""" | ||
|
||
def gen_script(self, job): | ||
JH_UniScheduler_script = super().gen_script(job) | ||
return JH_UniScheduler_script | ||
|
||
def gen_script_header(self, job): | ||
resources = job.resources | ||
script_header_dict = { | ||
"JH_UniScheduler_nodes_line": f"#JSUB -n {resources.number_node * resources.cpu_per_node}", | ||
"JH_UniScheduler_ptile_line": f"#JSUB -R 'span[ptile={resources.cpu_per_node}]'", | ||
"JH_UniScheduler_partition_line": f"#JSUB -q {resources.queue_name}", | ||
} | ||
custom_gpu_line = resources.kwargs.get("custom_gpu_line", None) | ||
if not custom_gpu_line: | ||
script_header_dict["JH_UniScheduler_number_gpu_line"] = ( | ||
"" f"#JSUB -gpgpu {resources.gpu_per_node}" | ||
) | ||
else: | ||
script_header_dict["JH_UniScheduler_number_gpu_line"] = custom_gpu_line | ||
if ( | ||
resources["strategy"].get("customized_script_header_template_file") | ||
is not None | ||
): | ||
JH_UniScheduler_script_header = customized_script_header_template( | ||
resources["strategy"]["customized_script_header_template_file"], | ||
resources, | ||
) | ||
else: | ||
JH_UniScheduler_script_header = ( | ||
JH_UniScheduler_script_header_template.format(**script_header_dict) | ||
) | ||
|
||
return JH_UniScheduler_script_header | ||
|
||
@retry() | ||
def do_submit(self, job): | ||
script_file_name = job.script_file_name | ||
script_str = self.gen_script(job) | ||
job_id_name = job.job_hash + "_job_id" | ||
self.context.write_file(fname=script_file_name, write_str=script_str) | ||
script_run_str = self.gen_script_command(job) | ||
script_run_file_name = f"{job.script_file_name}.run" | ||
self.context.write_file(fname=script_run_file_name, write_str=script_run_str) | ||
|
||
try: | ||
stdin, stdout, stderr = self.context.block_checkcall( | ||
"cd {} && {} {}".format( | ||
shlex.quote(self.context.remote_root), | ||
"jsub < ", | ||
shlex.quote(script_file_name), | ||
) | ||
) | ||
except RuntimeError as err: | ||
raise RetrySignal(err) from err | ||
|
||
subret = stdout.readlines() | ||
job_id = subret[0].split()[1][1:-1] | ||
self.context.write_file(job_id_name, job_id) | ||
return job_id | ||
|
||
def default_resources(self, resources): | ||
pass | ||
|
||
@retry() | ||
def check_status(self, job): | ||
try: | ||
job_id = job.job_id | ||
except AttributeError: | ||
return JobStatus.terminated | ||
if job_id == "": | ||
return JobStatus.unsubmitted | ||
ret, stdin, stdout, stderr = self.context.block_call("jjobs " + job_id) | ||
err_str = stderr.read().decode("utf-8") | ||
if (f"Job <{job_id}> is not found") in err_str: | ||
if self.check_finish_tag(job): | ||
return JobStatus.finished | ||
else: | ||
return JobStatus.terminated | ||
elif ret != 0: | ||
# just retry when any unknown error raised. | ||
raise RetrySignal( | ||
"Get error code %d in checking status through ssh with job: %s . message: %s" | ||
% (ret, job.job_hash, err_str) | ||
) | ||
status_out = stdout.read().decode("utf-8").split("\n") | ||
if len(status_out) < 2: | ||
return JobStatus.unknown | ||
else: | ||
status_line = status_out[1] | ||
status_word = status_line.split()[2] | ||
|
||
if status_word in ["PEND"]: | ||
return JobStatus.waiting | ||
elif status_word in ["RUN", "PSUSP", "SSUSP", "USUSP"]: | ||
return JobStatus.running | ||
elif status_word in ["DONE", "EXIT"]: | ||
if self.check_finish_tag(job): | ||
dlog.info(f"job: {job.job_hash} {job.job_id} finished") | ||
return JobStatus.finished | ||
else: | ||
return JobStatus.terminated | ||
else: | ||
return JobStatus.unknown | ||
|
||
def check_finish_tag(self, job): | ||
job_tag_finished = job.job_hash + "_job_tag_finished" | ||
return self.context.check_file_exists(job_tag_finished) | ||
|
||
def resources_subfields(cls) -> List[Argument]: | ||
"""Generate the resources subfields. | ||
|
||
Returns | ||
------- | ||
list[Argument] | ||
resources subfields | ||
""" | ||
doc_custom_gpu_line = "Custom GPU configuration, starting with #JSUB" | ||
|
||
return [ | ||
Argument( | ||
"kwargs", | ||
dict, | ||
[ | ||
Argument( | ||
"custom_gpu_line", | ||
str, | ||
optional=True, | ||
default=None, | ||
doc=doc_custom_gpu_line, | ||
), | ||
], | ||
optional=False, | ||
doc="Extra arguments.", | ||
) | ||
] | ||
njzjz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def kill(self, job): | ||
"""Kill the job. | ||
|
||
Parameters | ||
---------- | ||
job : Job | ||
job | ||
""" | ||
job_id = job.job_id | ||
ret, stdin, stdout, stderr = self.context.block_call( | ||
"jctrl kill " + str(job_id) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import json | ||
import os | ||
import sys | ||
Comment on lines
+1
to
+3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification The script output indicates that all the imports in
These imports should be reviewed and removed if they are indeed not used in the code.
Analysis chainEnsure all necessary imports are used and there are no unused imports. Scripts ExecutedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check for unused imports in the file.
# Test: Search for unused imports. Expect: No output if all imports are used.
rg --type python -e "import" -e "from" tests/devel_test_JH_UniScheduler.py | xargs -I {} ast-grep --lang python --pattern 'import $_' {}
Length of output: 169 Script: #!/bin/bash
# Description: Check for unused imports in the file.
# List all import statements in the file
imports=$(rg --type py -o -N '^(import|from) .+' tests/devel_test_JH_UniScheduler.py)
# Check each import to see if it is used in the file
for import in $imports; do
# Extract the module name from the import statement
module=$(echo $import | sed -E 's/(import|from) ([^ ]+).*/\2/')
# Search for usage of the module in the file
usage=$(rg --type py -q $module tests/devel_test_JH_UniScheduler.py)
# If the module is not used, print the import statement
if [ -z "$usage" ]; then
echo "Unused import: $import"
fi
done
Length of output: 4165 |
||
|
||
from dpdispatcher.machine import Machine | ||
from dpdispatcher.submission import Resources, Submission, Task | ||
|
||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | ||
|
||
# task_need_resources has no effect | ||
with open("jsons/machine_JH_UniScheduler.json") as f: | ||
mdata = json.load(f) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tip Codebase Verification The directory Analysis chainVerify the JSON file path and ensure it is correctly formatted and accessible. Scripts ExecutedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Verify the JSON file path and check its format.
# Test: Check if the JSON file exists and is valid.
if [ -f "jsons/machine_JH_UniScheduler.json" ]; then
echo "JSON file exists."
if jq empty jsons/machine_JH_UniScheduler.json; then
echo "JSON is well formatted."
else
echo "JSON is not well formatted."
fi
else
echo "JSON file does not exist."
fi
Length of output: 108 Script: #!/bin/bash
# Description: Search for references to "machine_JH_UniScheduler.json" in the codebase to confirm the correct path.
# Search for the JSON file reference in the codebase
rg "machine_JH_UniScheduler.json"
Length of output: 123 Script: #!/bin/bash
# Description: List the contents of the "jsons" directory to check for the presence of the JSON file or similarly named files.
# List the contents of the "jsons" directory
if [ -d "jsons" ]; then
ls -l jsons
else
echo "Directory 'jsons' does not exist."
fi
Length of output: 101 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you actually run this file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @njzjz, it appears that the Analysis chainScripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Check if the 'jsons' directory and the 'machine_JH_UniScheduler.json' file exist
if [ -d "jsons" ]; then
echo "Directory 'jsons' exists."
if [ -f "jsons/machine_JH_UniScheduler.json" ]; then
echo "File 'machine_JH_UniScheduler.json' exists."
else
echo "File 'machine_JH_UniScheduler.json' does not exist."
fi
else
echo "Directory 'jsons' does not exist."
fi
# Check for any logs or outputs indicating the test script has been executed
rg "Running tests/devel_test_JH_UniScheduler.py" --files-with-matches
Length of output: 173 |
||
machine = Machine.load_from_dict(mdata["machine"]) | ||
resources = Resources.load_from_dict(mdata["resources"]) | ||
|
||
submission = Submission( | ||
work_base="0_md/", | ||
machine=machine, | ||
resources=resources, | ||
forward_common_files=["graph.pb"], | ||
backward_common_files=[], | ||
) | ||
|
||
task1 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-1/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
task2 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-2/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
task3 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-3/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
task4 = Task( | ||
command="lmp -i input.lammps", | ||
task_work_path="bct-4/", | ||
forward_files=["conf.lmp", "input.lammps"], | ||
backward_files=["log.lammps"], | ||
) | ||
submission.register_task_list( | ||
[ | ||
task1, | ||
task2, | ||
task3, | ||
task4, | ||
] | ||
) | ||
submission.run_submission(clean=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add documentation to
doc/batch.md
.