Skip to content

Commit

Permalink
add SGE class in pbs.py (#323)
Browse files Browse the repository at this point in the history
This SGE class inherit from PBS machine class. I have tested with a
vanilla python print job and it works. I can continue working on testing
with more complex tasks.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>
  • Loading branch information
3 people committed Dec 31, 2023
1 parent d3a6eb6 commit 3594f91
Showing 1 changed file with 104 additions and 0 deletions.
104 changes: 104 additions & 0 deletions dpdispatcher/machines/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,107 @@ def gen_script_header(self, job):
**pbs_script_header_dict
)
return pbs_script_header


sge_script_header_template = """
#!/bin/bash
#$ -N dpdispatcher_submit
{select_node_line}
#$ -cwd
"""


class SGE(PBS):
def __init__(
self,
batch_type=None,
context_type=None,
local_root=None,
remote_root=None,
remote_profile={},
*,
context=None,
):
super(PBS, self).__init__(
batch_type,
context_type,
local_root,
remote_root,
remote_profile,
context=context,
)

def gen_script_header(self, job):
resources = job.resources
sge_script_header_dict = {}
# resources.number_node is not used
sge_script_header_dict[
"select_node_line"
] = f"#$ -pe mpi {resources.cpu_per_node} "
# resources.queue_name is not necessary
sge_script_header = sge_script_header_template.format(**sge_script_header_dict)
return sge_script_header

def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
self.context.write_file(fname=script_file_name, write_str=script_str)
script_file_dir = self.context.remote_root
stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(script_file_dir, "qsub", script_file_name)
)
subret = stdout.readlines()
job_id = subret[0].split()[2]
self.context.write_file(job_id_name, job_id)
return job_id

def default_resources(self, resources):
pass

def check_status(self, job):
job_id = job.job_id
status_line = None
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("qstat")
err_str = stderr.read().decode("utf-8")
if ret != 0:
raise RuntimeError(
"status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret)
)
status_text_list = stdout.read().decode("utf-8").split("\n")
for txt in status_text_list:
if job_id in txt:
status_line = txt

if status_line is None:
count = 0
while count <= 6:
if self.check_finish_tag(job=job):
return JobStatus.finished
dlog.info(
"not tag_finished detected, execute sync command and wait. count "
+ str(count)
)
self.context.block_call("sync")
import time

time.sleep(10)
count += 1
return JobStatus.terminated
else:
status_word = status_line.split()[4]
# dlog.info (status_word)
if status_word in ["qw"]:
return JobStatus.waiting
elif status_word in ["r"]:
return JobStatus.running
else:
return JobStatus.unknown

def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)

0 comments on commit 3594f91

Please sign in to comment.