Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions dpdispatcher/machines/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,107 @@ def gen_script_header(self, job):
**pbs_script_header_dict
)
return pbs_script_header


sge_script_header_template = """
#!/bin/bash
#$ -N dpdispatcher_submit
{select_node_line}
#$ -cwd

"""


class SGE(PBS):
def __init__(
self,
batch_type=None,
context_type=None,
local_root=None,
remote_root=None,
remote_profile={},
*,
context=None,
):
super(PBS, self).__init__(
batch_type,
context_type,
local_root,
remote_root,
remote_profile,
context=context,
)

def gen_script_header(self, job):
resources = job.resources
sge_script_header_dict = {}
# resources.number_node is not used
sge_script_header_dict[
"select_node_line"
] = f"#$ -pe mpi {resources.cpu_per_node} "
# resources.queue_name is not necessary
sge_script_header = sge_script_header_template.format(**sge_script_header_dict)
return sge_script_header

def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
self.context.write_file(fname=script_file_name, write_str=script_str)
script_file_dir = self.context.remote_root
stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(script_file_dir, "qsub", script_file_name)
)
subret = stdout.readlines()
job_id = subret[0].split()[2]
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
job_id = job.job_id
status_line = None
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("qstat")
err_str = stderr.read().decode("utf-8")
if ret != 0:
raise RuntimeError(
"status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret)
)
status_text_list = stdout.read().decode("utf-8").split("\n")
for txt in status_text_list:
if job_id in txt:
status_line = txt

if status_line is None:
count = 0
while count <= 6:
if self.check_finish_tag(job=job):
return JobStatus.finished
dlog.info(
"not tag_finished detected, execute sync command and wait. count "
+ str(count)
)
self.context.block_call("sync")
import time

time.sleep(10)
count += 1
return JobStatus.terminated
else:
status_word = status_line.split()[4]
# dlog.info (status_word)
if status_word in ["qw"]:
return JobStatus.waiting
elif status_word in ["r"]:
return JobStatus.running
else:
return JobStatus.unknown

def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)