Skip to content

Commit

Permalink
add new batch_type Fugaku (#351)
Browse files Browse the repository at this point in the history
Added a new batch type for use on Fugaku, a supercomputer using
Fujitsu's unique job scheduler.
It has been tested on Dpgen for 20 iterations and no problems have been
found.
Also uploaded JSON files for both local and ssh cases.

---------

Co-authored-by: ZHANG <zhang@MSI.localdomain>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 9, 2023
1 parent abfe441 commit 8ee05a7
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 0 deletions.
8 changes: 8 additions & 0 deletions doc/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,11 @@ Read Bohrium documentation for details.

`DistributedShell` is used to submit yarn jobs.
Read [Support DPDispatcher on Yarn](dpdispatcher_on_yarn.md) for details.

## Fugaku

{dargs:argument}`batch_type <resources/batch_type>`: `Fugaku`

[Fujitsu cloud service](https://doc.cloud.global.fujitsu.com/lib/common/jp/hpc-user-manual/) is a job scheduling system used by Fujitsu's HPCs such as Fugaku, ITO and K computer. It should be noted that although the same job scheduling system is used, there are some differences in the details, Fagaku class cannot be directly used for other HPCs.

Read Fujitsu cloud service documentation for details.
2 changes: 2 additions & 0 deletions dpdispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from .distributed_shell import DistributedShell
from .dp_cloud_server import DpCloudServer, Lebesgue
from .dp_cloud_server_context import DpCloudServerContext, LebesgueContext
from .fugaku import Fugaku
from .hdfs_context import HDFSContext
from .lazy_local_context import LazyLocalContext
from .local_context import LocalContext
Expand Down Expand Up @@ -85,6 +86,7 @@ def info():
"PBS",
"Shell",
"Slurm",
"Fugaku",
"SSHContext",
"Submission",
"Task",
Expand Down
94 changes: 94 additions & 0 deletions dpdispatcher/fugaku.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import shlex

from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine

fugaku_script_header_template = """\
{queue_name_line}
{fugaku_node_number_line}
{fugaku_ntasks_per_node_line}
"""


class Fugaku(Machine):
def gen_script(self, job):
fugaku_script = super().gen_script(job)
return fugaku_script

def gen_script_header(self, job):
resources = job.resources
fugaku_script_header_dict = {}
fugaku_script_header_dict[
"fugaku_node_number_line"
] = f'#PJM -L "node={resources.number_node}" '
fugaku_script_header_dict[
"fugaku_ntasks_per_node_line"
] = '#PJM --mpi "max-proc-per-node={cpu_per_node}"'.format(
cpu_per_node=resources.cpu_per_node
)
fugaku_script_header_dict[
"queue_name_line"
] = f'#PJM -L "rscgrp={resources.queue_name}"'
fugaku_script_header = fugaku_script_header_template.format(
**fugaku_script_header_dict
)
return fugaku_script_header

def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
# script_str = self.sub_script(job_dirs, cmd, args=args, resources=resources, outlog=outlog, errlog=errlog)
self.context.write_file(fname=script_file_name, write_str=script_str)
# self.context.write_file(fname=os.path.join(self.context.submission.work_base, script_file_name), write_str=script_str)
# script_file_dir = os.path.join(self.context.submission.work_base)
script_file_dir = self.context.remote_root
# stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (self.context.remote_root, 'pjsub', script_file_name))

stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(
shlex.quote(script_file_dir), "pjsub", shlex.quote(script_file_name)
)
)
subret = stdout.readlines()
job_id = subret[0].split()[5]
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
job_id = job.job_id
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("pjstat " + job_id)
err_str = stderr.read().decode("utf-8")
try:
status_line = stdout.read().decode("utf-8").split("\n")[-2]
# pjstat only retrun 0 if the job is not waiting or running
except Exception:
ret, stdin, stdout, stderr = self.context.block_call("pjstat -H " + job_id)
status_line = stdout.read().decode("utf-8").split("\n")[-2]
status_word = status_line.split()[3]
if status_word in ["EXT", "CCL", "ERR"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown
status_word = status_line.split()[3]
# dlog.info (status_word)
if status_word in ["QUE", "HLD", "RNA", "SPD"]:
return JobStatus.waiting
elif status_word in ["RUN", "RNE"]:
return JobStatus.running
else:
return JobStatus.unknown

def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)
24 changes: 24 additions & 0 deletions tests/jsons/machine_fugaku.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"machine": {
"batch_type": "fugaku",
"context_type": "SSHContext",
"local_root" : "./",
"remote_profile": {
"hostname": "login.****.jp",
"key_filename":"/home/***/.ssh/***",
"passphrase":"******",
"username": "u*****"
},
"remote_root": "/vol*****/data/****"
},
"resources": {
"number_node": 1,
"cpu_per_node": 48,
"source_list": [""],
"queue_name": "small",
"group_size": 1,
"custom_flags" : ["#PJM -L \"elapse=4:00:00\"",
"#PJM -x PJM_LLIO_GFSCACHE=/vol0004",
"#PJM -g hp******"]
}
}
18 changes: 18 additions & 0 deletions tests/jsons/machine_local_fugaku.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"machine": {
"batch_type": "fugaku",
"context_type": "local",
"local_root" : "./",
"remote_root": "./"
},
"resources": {
"number_node": 1,
"cpu_per_node": 48,
"source_list": [""],
"queue_name": "small",
"group_size": 1,
"custom_flags" : ["#PJM -L \"elapse=4:00:00\"",
"#PJM -x PJM_LLIO_GFSCACHE=/vol0004",
"#PJM -g hp******"]
}
}

0 comments on commit 8ee05a7

Please sign in to comment.