Skip to content

Commit

Permalink
Merge pull request #64 from felix5572/master
Browse files Browse the repository at this point in the history
fix bug; will not tag job_tag_finished if one of the tasks fail
  • Loading branch information
felix5572 committed Jun 17, 2021
2 parents 840ec1f + 272a56c commit 4c1d677
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 51 deletions.
3 changes: 2 additions & 1 deletion dpdispatcher/dpcloudserver/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ def upload(oss_task_zip, zip_task_file, endpoint, bucket_name):
parts.append(PartInfo(part_number, result.etag))
offset += num_to_upload
part_number += 1
# result = bucket.complete_multipart_upload(oss_task_zip, upload_id, parts)
result = bucket.complete_multipart_upload(oss_task_zip, upload_id, parts)
# print('debug:upload_result:', result, dir())
return True
return result


def job_create(job_type, oss_path, input_data):
Expand Down
68 changes: 34 additions & 34 deletions dpdispatcher/dpcloudserver/zip_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,50 +28,50 @@ def zip_file_list(root_path, zip_filename, file_list=[]):
zip_obj.close()
return out_zip_file

def zip_files(root_path, out_file, selected=[]):
obj = ZipFile(out_file, "w")
# change /xxx/ to /xxx or xxx to /xxx and pop ''
for i in range(len(selected)):
if not selected[i]:
selected.pop(i)
continue
# def zip_files(root_path, out_file, selected=[]):
# obj = ZipFile(out_file, "w")
# # change /xxx/ to /xxx or xxx to /xxx and pop ''
# for i in range(len(selected)):
# if not selected[i]:
# selected.pop(i)
# continue

selected[i] = selected[i].strip()
if selected[i].endswith('/'):
selected[i] = selected[i][:-1]
if not selected[i].startswith('/'):
selected[i] = '/{}'.format(selected[i])
# selected[i] = selected[i].strip()
# if selected[i].endswith('/'):
# selected[i] = selected[i][:-1]
# if not selected[i].startswith('/'):
# selected[i] = '/{}'.format(selected[i])

for root, dirs, files in os.walk(root_path):
for item in files:
filename = os.path.join(root, item)
arcname = filename.replace(root_path,'')
if not is_selected(arcname, selected):
continue
# for root, dirs, files in os.walk(root_path):
# for item in files:
# filename = os.path.join(root, item)
# arcname = filename.replace(root_path,'')
# if not is_selected(arcname, selected):
# continue

obj.write(filename, arcname)
if not obj.filelist:
return
# obj.write(filename, arcname)
# if not obj.filelist:
# return

obj.close()
# obj.close()


def is_selected(arcname, selected):
if not selected:
return True
# def is_selected(arcname, selected):
# if not selected:
# return True

arcdir = os.path.dirname(arcname)
for s in selected:
if arcname == s:
return True
# arcdir = os.path.dirname(arcname)
# for s in selected:
# if arcname == s:
# return True

if arcdir == s:
return True
# if arcdir == s:
# return True

if arcname.startswith(s + '/'):
return True
# if arcname.startswith(s + '/'):
# return True

return False
# return False


def unzip_file(zip_file, out_dir='./'):
Expand Down
7 changes: 4 additions & 3 deletions dpdispatcher/local_context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dpdispatcher.base_context import BaseContext
import os,shutil,uuid,hashlib
import os,shutil,hashlib
import subprocess as sp
from glob import glob
from dpdispatcher import dlog
from subprocess import TimeoutExpired

# class LocalSession (object) :
# def __init__ (self, jdata) :
Expand Down Expand Up @@ -382,12 +383,12 @@ def get_return(self, proc):
ret = proc.poll()
if ret is None:
return None, None, None
else :
else:
try:
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
except:
except TimeoutExpired:
stdout = None
stderr = None
return ret, stdout, stderr
27 changes: 20 additions & 7 deletions dpdispatcher/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"""

script_env_template="""
export REMOTE_ROOT={remote_root}
echo 0 > {flag_job_task_fail}
REMOTE_ROOT={remote_root}
test $? -ne 0 && exit 1
{module_unload_part}
Expand All @@ -30,7 +31,7 @@
test $? -ne 0 && exit 1
if [ ! -f {task_tag_finished} ] ;then
{command_env} ( {command} ) {log_err_part}
if test $? -eq 0; then touch {task_tag_finished}; fi
if test $? -eq 0; then touch {task_tag_finished}; else echo 1 > {flag_job_task_fail};fi
fi &
"""

Expand All @@ -39,8 +40,8 @@
test $? -ne 0 && exit 1
wait
touch {job_tag_finished}
FLAG_JOB_TASK_FAIL=$(cat {flag_job_task_fail})
if test $FLAG_JOB_TASK_FAIL -eq 0; then touch {job_tag_finished}; else exit 1;fi
"""

class Machine(object):
Expand Down Expand Up @@ -176,7 +177,10 @@ def gen_script_env(self, job):
for k,v in envs.items():
export_envs_part += f"export {k}={v}\n"

flag_job_task_fail = job.job_hash + '_flag_job_task_fail'

script_env = script_env_template.format(
flag_job_task_fail=flag_job_task_fail,
remote_root=self.context.remote_root,
module_unload_part=module_unload_part,
module_load_part=module_load_part,
Expand All @@ -201,8 +205,13 @@ def gen_script_command(self, job):
if task.errlog is not None:
log_err_part += f"2>>{task.errlog} "

single_script_command = script_command_template.format(command_env=command_env,
task_work_path=task.task_work_path, command=task.command, task_tag_finished=task_tag_finished,
flag_job_task_fail = job.job_hash + '_flag_job_task_fail'
single_script_command = script_command_template.format(
flag_job_task_fail=flag_job_task_fail,
command_env=command_env,
task_work_path=task.task_work_path,
command=task.command,
task_tag_finished=task_tag_finished,
log_err_part=log_err_part)
script_command += single_script_command

Expand All @@ -211,7 +220,11 @@ def gen_script_command(self, job):

def gen_script_end(self, job):
job_tag_finished = job.job_hash + '_job_tag_finished'
script_end = script_end_template.format(job_tag_finished=job_tag_finished)
flag_job_task_fail = job.job_hash + '_flag_job_task_fail'
script_end = script_end_template.format(
job_tag_finished=job_tag_finished,
flag_job_task_fail=flag_job_task_fail
)
return script_end

def gen_script_wait(self, resources):
Expand Down
5 changes: 2 additions & 3 deletions dpdispatcher/shell.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os,sys,time,random,uuid
import psutil

from dpdispatcher.JobStatus import JobStatus
Expand Down Expand Up @@ -48,7 +47,7 @@ def default_resources(self, resources) :
def check_status(self, job):
job_id = job.job_id
# print('shell.check_status.job_id', job_id)
job_state = JobStatus.unknown
# job_state = JobStatus.unknown
if job_id == "" :
return JobStatus.unsubmitted

Expand All @@ -61,7 +60,7 @@ def check_status(self, job):
return JobStatus.running
else:
return JobStatus.terminated
return job_state
# return job_state

# def check_status(self, job):
# job_id = job.job_id
Expand Down
2 changes: 1 addition & 1 deletion dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def try_recover_from_json(self):
if self == submission:
self.belonging_jobs = submission.belonging_jobs
self.bind_machine(machine=self.machine)
self = submission.bind_machine(machine=self.machine)
# self = submission.bind_machine(machine=self.machine)
else:
print(self.serialize())
print(submission.serialize())
Expand Down
10 changes: 8 additions & 2 deletions tests/test_shell_cuda_multi_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ def test_shell_cuda_multi_devices(self):

task_list = []
for ii in range(16):
task = Task(command=f"echo dpdispatcher_unittest_{ii}", task_work_path='./', forward_files=[], backward_files=[], outlog='out.txt')
task = Task(
command=f"echo dpdispatcher_unittest_{ii}",
task_work_path='./',
forward_files=[],
backward_files=[],
outlog='out.txt'
)
task_list.append(task)

submission = Submission(work_base='test_dir/',
Expand All @@ -42,8 +48,8 @@ def test_shell_cuda_multi_devices(self):

@classmethod
def tearDownClass(cls):
# pass
shutil.rmtree('tmp_if_cuda_multi_devices/')
# pass



Expand Down

0 comments on commit 4c1d677

Please sign in to comment.