From 0d1ba710dcf747d4505e19ab458320da030b6928 Mon Sep 17 00:00:00 2001 From: GravityPHY <36836179+GravityPHY@users.noreply.github.com> Date: Fri, 14 Apr 2023 12:58:41 -0400 Subject: [PATCH 1/5] add SGE class in pbs.py --- dpdispatcher/pbs.py | 88 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/dpdispatcher/pbs.py b/dpdispatcher/pbs.py index c4d866be..c0d0ca74 100644 --- a/dpdispatcher/pbs.py +++ b/dpdispatcher/pbs.py @@ -147,3 +147,91 @@ def gen_script_header(self, job): ) pbs_script_header = pbs_script_header_template.format(**pbs_script_header_dict) return pbs_script_header + + +sge_script_header_template=""" +#!/bin/bash +#$ -N dpdispatcher_submit +{select_node_line} +#$ -cwd + +""" + +class SGE(PBS): + def __init__(self, + batch_type=None, + context_type=None, + local_root=None, + remote_root=None, + remote_profile={}, + *, + context=None): + super(PBS,self).__init__(batch_type,context_type,local_root,remote_root,remote_profile,context=context) + self.status_record_list=[] + + def gen_script_header(self, job): + resources = job.resources + sge_script_header_dict= {} + #resources.number_node is not used + sge_script_header_dict['select_node_line']="#$ -pe mpi {cpu_per_node} ".format( + cpu_per_node=resources.cpu_per_node) + #resources.queue_name is not necessary + sge_script_header = sge_script_header_template.format(**sge_script_header_dict) + return sge_script_header + + def do_submit(self, job): + script_file_name = job.script_file_name + script_str = self.gen_script(job) + job_id_name = job.job_hash + '_job_id' + self.context.write_file(fname=script_file_name, write_str=script_str) + script_file_dir = self.context.remote_root + stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (script_file_dir, 'qsub', script_file_name)) + subret = (stdout.readlines()) + job_id = subret[0].split()[2] + self.context.write_file(job_id_name, job_id) + return job_id + + + def default_resources(self, resources) : + pass + + def check_status(self, job): + job_id = job.job_id + status_line=None + if job_id == "" : + return JobStatus.unsubmitted + ret, stdin, stdout, stderr= self.context.block_call("qstat") + err_str = stderr.read().decode('utf-8') + if (ret != 0) : + raise RuntimeError ("status command qstat fails to execute. erro info: %s return code %d" + % (err_str, ret)) + status_text_list=stdout.read().decode('utf-8').split('\n') + for txt in status_text_list: + if job_id in txt: + status_line = txt + print("status_line",status_line) + self.status_record_list.append(status_line) + if (status_line == None) and (self.status_record_list != []): + count=0 + while count <= 6: + if self.check_finish_tag(job=job) : + return JobStatus.finished + print("INFO: not tag_finished detected, execute sync command and wait. count " + str(count)) + self.context.block_call("sync") + import time + time.sleep(10) + count+=1 + return JobStatus.terminated + else: + status_word = status_line.split()[4] + # dlog.info (status_word) + if status_word in ["qw"] : + return JobStatus.waiting + elif status_word in ["r"] : + return JobStatus.running + else : + return JobStatus.unknown + + def check_finish_tag(self, job): + job_tag_finished = job.job_hash + '_job_tag_finished' + return self.context.check_file_exists(job_tag_finished) From e582650db3b8116dde8316d2cab80a3aea1b1301 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 14 Apr 2023 17:03:17 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- dpdispatcher/pbs.py | 86 +++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 34 deletions(-) diff --git a/dpdispatcher/pbs.py b/dpdispatcher/pbs.py index c0d0ca74..3b43ddb4 100644 --- a/dpdispatcher/pbs.py +++ b/dpdispatcher/pbs.py @@ -149,7 +149,7 @@ def gen_script_header(self, job): return pbs_script_header -sge_script_header_template=""" +sge_script_header_template = """ #!/bin/bash #$ -N dpdispatcher_submit {select_node_line} @@ -157,81 +157,99 @@ def gen_script_header(self, job): """ + class SGE(PBS): - def __init__(self, + def __init__( + self, batch_type=None, context_type=None, local_root=None, remote_root=None, remote_profile={}, *, - context=None): - super(PBS,self).__init__(batch_type,context_type,local_root,remote_root,remote_profile,context=context) - self.status_record_list=[] + context=None, + ): + super(PBS, self).__init__( + batch_type, + context_type, + local_root, + remote_root, + remote_profile, + context=context, + ) + self.status_record_list = [] def gen_script_header(self, job): resources = job.resources - sge_script_header_dict= {} - #resources.number_node is not used - sge_script_header_dict['select_node_line']="#$ -pe mpi {cpu_per_node} ".format( - cpu_per_node=resources.cpu_per_node) - #resources.queue_name is not necessary + sge_script_header_dict = {} + # resources.number_node is not used + sge_script_header_dict[ + "select_node_line" + ] = f"#$ -pe mpi {resources.cpu_per_node} " + # resources.queue_name is not necessary sge_script_header = sge_script_header_template.format(**sge_script_header_dict) return sge_script_header def do_submit(self, job): script_file_name = job.script_file_name script_str = self.gen_script(job) - job_id_name = job.job_hash + '_job_id' + job_id_name = job.job_hash + "_job_id" self.context.write_file(fname=script_file_name, write_str=script_str) script_file_dir = self.context.remote_root - stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (script_file_dir, 'qsub', script_file_name)) - subret = (stdout.readlines()) + stdin, stdout, stderr = self.context.block_checkcall( + "cd {} && {} {}".format(script_file_dir, "qsub", script_file_name) + ) + subret = stdout.readlines() job_id = subret[0].split()[2] self.context.write_file(job_id_name, job_id) return job_id - - def default_resources(self, resources) : + def default_resources(self, resources): pass - + def check_status(self, job): job_id = job.job_id - status_line=None - if job_id == "" : + status_line = None + if job_id == "": return JobStatus.unsubmitted - ret, stdin, stdout, stderr= self.context.block_call("qstat") - err_str = stderr.read().decode('utf-8') - if (ret != 0) : - raise RuntimeError ("status command qstat fails to execute. erro info: %s return code %d" - % (err_str, ret)) - status_text_list=stdout.read().decode('utf-8').split('\n') + ret, stdin, stdout, stderr = self.context.block_call("qstat") + err_str = stderr.read().decode("utf-8") + if ret != 0: + raise RuntimeError( + "status command qstat fails to execute. erro info: %s return code %d" + % (err_str, ret) + ) + status_text_list = stdout.read().decode("utf-8").split("\n") for txt in status_text_list: if job_id in txt: status_line = txt - print("status_line",status_line) + print("status_line", status_line) self.status_record_list.append(status_line) - if (status_line == None) and (self.status_record_list != []): - count=0 + if (status_line is None) and (self.status_record_list != []): + count = 0 while count <= 6: - if self.check_finish_tag(job=job) : + if self.check_finish_tag(job=job): return JobStatus.finished - print("INFO: not tag_finished detected, execute sync command and wait. count " + str(count)) + print( + "INFO: not tag_finished detected, execute sync command and wait. count " + + str(count) + ) self.context.block_call("sync") import time + time.sleep(10) - count+=1 + count += 1 return JobStatus.terminated else: status_word = status_line.split()[4] # dlog.info (status_word) - if status_word in ["qw"] : + if status_word in ["qw"]: return JobStatus.waiting - elif status_word in ["r"] : + elif status_word in ["r"]: return JobStatus.running - else : + else: return JobStatus.unknown def check_finish_tag(self, job): - job_tag_finished = job.job_hash + '_job_tag_finished' + job_tag_finished = job.job_hash + "_job_tag_finished" return self.context.check_file_exists(job_tag_finished) From d4e2d0356eea46c9c951831c56e6ffd8a6ad3c60 Mon Sep 17 00:00:00 2001 From: GravityPHY <36836179+GravityPHY@users.noreply.github.com> Date: Sat, 30 Dec 2023 20:28:54 -0500 Subject: [PATCH 3/5] Update pbs.py --- dpdispatcher/pbs.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dpdispatcher/pbs.py b/dpdispatcher/pbs.py index 3b43ddb4..864aefa6 100644 --- a/dpdispatcher/pbs.py +++ b/dpdispatcher/pbs.py @@ -177,7 +177,6 @@ def __init__( remote_profile, context=context, ) - self.status_record_list = [] def gen_script_header(self, job): resources = job.resources @@ -223,9 +222,8 @@ def check_status(self, job): for txt in status_text_list: if job_id in txt: status_line = txt - print("status_line", status_line) - self.status_record_list.append(status_line) - if (status_line is None) and (self.status_record_list != []): + + if status_line is None: count = 0 while count <= 6: if self.check_finish_tag(job=job): From 476584525de484fc32b5ba5236ce0e3240c407cd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 31 Dec 2023 21:46:53 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- dpdispatcher/machines/pbs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dpdispatcher/machines/pbs.py b/dpdispatcher/machines/pbs.py index 1f5802f2..b3f7398b 100644 --- a/dpdispatcher/machines/pbs.py +++ b/dpdispatcher/machines/pbs.py @@ -252,7 +252,7 @@ def check_status(self, job): for txt in status_text_list: if job_id in txt: status_line = txt - + if status_line is None: count = 0 while count <= 6: From 5207175b1418109f14bc15d2baca23d3a2f8de9b Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Sun, 31 Dec 2023 16:48:19 -0500 Subject: [PATCH 5/5] use dlog.info --- dpdispatcher/machines/pbs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dpdispatcher/machines/pbs.py b/dpdispatcher/machines/pbs.py index b3f7398b..9a7e9c8d 100644 --- a/dpdispatcher/machines/pbs.py +++ b/dpdispatcher/machines/pbs.py @@ -258,8 +258,8 @@ def check_status(self, job): while count <= 6: if self.check_finish_tag(job=job): return JobStatus.finished - print( - "INFO: not tag_finished detected, execute sync command and wait. count " + dlog.info( + "not tag_finished detected, execute sync command and wait. count " + str(count) ) self.context.block_call("sync")