Skip to content

Commit

Permalink
fix local context with uploading files in the subdirectory (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
njzjz committed Jan 6, 2023
1 parent e4ac4f5 commit 6f38981
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 129 deletions.
128 changes: 2 additions & 126 deletions dpdispatcher/local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@
from dpdispatcher import dlog
from subprocess import TimeoutExpired

# class LocalSession (object) :
# def __init__ (self, jdata) :
# self.work_path = os.path.abspath(jdata['work_path'])
# assert(os.path.exists(self.work_path))

# def get_work_root(self) :
# return self.work_path

class SPRetObj(object) :
def __init__ (self,
Expand Down Expand Up @@ -60,18 +53,6 @@ def __init__(self,
self.temp_local_root = os.path.abspath(local_root)
self.temp_remote_root = os.path.abspath(remote_root)
self.remote_profile = remote_profile
# self.work_profile = work_profile
# self.job_uuid = job_uuid
# self.submission = None
# if job_uuid:
# self.job_uuid = job_uuid
# else:
# self.job_uuid = str(uuid.uuid4())

# self.remote_root = os.path.join(work_profile.get_work_root(), self.job_uuid)
# dlog.debug("local_root is %s"% local_root)

# os.makedirs(self.remote_root, exist_ok = True)

@classmethod
def load_from_dict(cls, context_dict):
Expand All @@ -92,34 +73,9 @@ def bind_submission(self, submission):
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = os.path.join(self.temp_remote_root, submission.submission_hash)
# print('debug:LocalContext.bind_submission', submission.submission_hash,
# self.local_root, self.remote_root)

# os.makedirs(self.remote_root, exist_ok = True)
# self.job_uuid = submission.submission_hash
# self.remote_root = os.path.join(self.work_profile.get_work_root(), self.job_uuid)
# os.makedirs(self.remote_root, exist_ok = True)
# print('local_context.bind_submission:self.remote_root', self.remote_root)
# dlog.debug("remote_root is %s"% self.remote_root)

# @property
# def remote_root(self):
# print('local_context.remote_root:self.submission.submission_hash', self.submission.submission_hash)
# print('local_context.remote_root self.submission', self.submission)
# self._remote_root = os.path.join(self.work_profile.get_work_root(), self.submission.submission_hash, self.submission.work_base)
# os.makedirs(self._remote_root, exist_ok = True)
# return self._remote_root

# @property
# def local_root(self):
# # self.local_root = os.path.abspath(local_root)
# self._local_root = os.path.join(, self.submission.submission_hash, self.submission.work_base)
# return self._local_root

def upload(self, submission):
os.makedirs(self.remote_root, exist_ok = True)
# os.makedirs(self.remote_root, exist_ok = True)
# job_dirs = [ ii.task_work_path for ii in submission.belonging_tasks]
for ii in submission.belonging_tasks:
local_job = os.path.join(self.local_root, ii.task_work_path)
remote_job = os.path.join(self.remote_root, ii.task_work_path)
Expand All @@ -138,13 +94,12 @@ def upload(self, submission):
raise RuntimeError('cannot find upload file ' + os.path.join(local_job, jj))
if os.path.exists(os.path.join(remote_job, jj)) :
os.remove(os.path.join(remote_job, jj))
_check_file_path(jj)
_check_file_path(os.path.join(remote_job, jj))
os.symlink(os.path.join(local_job, jj),
os.path.join(remote_job, jj))

local_job = self.local_root
remote_job = self.remote_root
# os.makedirs(remote_job, exist_ok = True)

file_list = []
for kk in submission.forward_common_files:
Expand All @@ -159,39 +114,19 @@ def upload(self, submission):
raise RuntimeError('cannot find upload file ' + os.path.join(local_job, jj))
if os.path.exists(os.path.join(remote_job, jj)) :
os.remove(os.path.join(remote_job, jj))
_check_file_path(jj)
_check_file_path(os.path.join(remote_job, jj))
os.symlink(os.path.join(local_job, jj),
os.path.join(remote_job, jj))

def upload_(self,
job_dirs,
local_up_files,
dereference = True) :
for ii in job_dirs :
local_job = os.path.join(self.local_root, ii)
remote_job = os.path.join(self.remote_root, ii)
os.makedirs(remote_job, exist_ok = True)
for jj in local_up_files :
if not os.path.exists(os.path.join(local_job, jj)):
raise RuntimeError('cannot find upload file ' + os.path.join(local_job, jj))
if os.path.exists(os.path.join(remote_job, jj)) :
os.remove(os.path.join(remote_job, jj))
_check_file_path(jj)
os.symlink(os.path.join(local_job, jj),
os.path.join(remote_job, jj))


def download(self,
submission,
check_exists = False,
mark_failure = True,
back_error=False) :

for ii in submission.belonging_tasks:
# for ii in job_dirs :
local_job = os.path.join(self.local_root, ii.task_work_path)
remote_job = os.path.join(self.remote_root, ii.task_work_path)
# flist = remote_down_files
flist = ii.backward_files
if back_error :
flist += glob(os.path.join(remote_job, 'error*'))
Expand Down Expand Up @@ -227,18 +162,11 @@ def download(self,
elif os.path.isfile(lfile) or os.path.islink(lfile):
os.remove(lfile)
shutil.copyfile(rfile, lfile)
# shutil.move(rfile, lfile)
else :
raise RuntimeError('should not reach here!')
else :
# no nothing in the case of linked files
pass
# for ii in submission.belonging_tasks:
# for ii in job_dirs :
# local_job = os.path.join(self.local_root, ii.task_work_path)
# remote_job = os.path.join(self.remote_root, ii.task_work_path)
# flist = remote_down_files
# flist = ii.backward_files
local_job = self.local_root
remote_job = self.remote_root
flist = submission.backward_common_files
Expand Down Expand Up @@ -274,61 +202,12 @@ def download(self,
elif os.path.isfile(lfile) or os.path.islink(lfile):
os.remove(lfile)
shutil.copyfile(rfile, lfile)
# shutil.move(rfile, lfile)
else :
raise RuntimeError('should not reach here!')
else :
# no nothing in the case of linked files
pass



def download_(self,
job_dirs,
remote_down_files,
check_exists = False,
mark_failure = True,
back_error=False) :
for ii in job_dirs :
local_job = os.path.join(self.local_root, ii)
remote_job = os.path.join(self.remote_root, ii)
flist = remote_down_files
if back_error :
flist += glob(os.path.join(remote_job, 'error*'))
for jj in flist :
rfile = os.path.join(remote_job, jj)
lfile = os.path.join(local_job, jj)
if not os.path.realpath(rfile) == os.path.realpath(lfile) :
if (not os.path.exists(rfile)) and (not os.path.exists(lfile)):
if check_exists:
if mark_failure:
with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp:
pass
else :
pass
else :
raise RuntimeError('do not find download file ' + rfile)
elif (not os.path.exists(rfile)) and (os.path.exists(lfile)) :
# already downloaded
pass
elif (os.path.exists(rfile)) and (not os.path.exists(lfile)) :
# trivial case, download happily
shutil.move(rfile, lfile)
elif (os.path.exists(rfile)) and (os.path.exists(lfile)) :
# both exists, replace!
dlog.info('find existing %s, replacing by %s' % (lfile, rfile))
if os.path.isdir(lfile):
shutil.rmtree(lfile, ignore_errors=True)
elif os.path.isfile(lfile) or os.path.islink(lfile):
os.remove(lfile)
shutil.move(rfile, lfile)
else :
raise RuntimeError('should not reach here!')
else :
# no nothing in the case of linked files
pass


def block_checkcall(self,
cmd) :
proc = sp.Popen(cmd, cwd=self.remote_root, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
Expand All @@ -351,9 +230,6 @@ def block_call(self, cmd) :
def clean(self):
shutil.rmtree(self.remote_root, ignore_errors=True)

# def _clean(self) :
# shutil.rmtree(self.remote_root, ignore_errors=True)

def write_file(self, fname, write_str):
os.makedirs(self.remote_root, exist_ok = True)
with open(os.path.join(self.remote_root, fname), 'w') as fp :
Expand Down
Empty file.
Empty file.
2 changes: 1 addition & 1 deletion tests/test_lazy_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_download(self):
def test_block_call(self):
code, stdin, stdout, stderr = self.lazy_local_context.block_call('ls')
self.assertEqual(stdout.readlines(), ['bct-1\n',
'bct-2\n', 'bct-3\n', 'bct-4\n', 'dir with space\n', 'graph.pb\n'])
'bct-2\n', 'bct-3\n', 'bct-4\n', 'dir with space\n', 'graph.pb\n', 'some_dir\n'])
self.assertEqual(code, 0)

code, stdin, stdout, stderr = self.lazy_local_context.block_call('ls a')
Expand Down
6 changes: 4 additions & 2 deletions tests/test_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ def test_upload(self):
submission_hash = 'mock_hash_2'
task1 = MagicMock(
task_work_path='bct-1/',
forward_files=['input.lammps', 'conf.lmp']
forward_files=['input.lammps', 'conf.lmp', 'some_dir/some_file']
)
task2 = MagicMock(
task_work_path='bct-2/',
forward_files=['input.lammps', 'conf.lmp']
)
submission = MagicMock(work_base='0_md/',
belonging_tasks=[task1, task2],
forward_common_files=['graph.pb'],
forward_common_files=['graph.pb', 'some_dir/some_file'],
submission_hash=submission_hash)

self.local_context.bind_submission(submission)
Expand All @@ -84,9 +84,11 @@ def test_upload(self):
check_file_list = [
'bct-1/input.lammps',
'bct-1/conf.lmp',
'bct-1/some_dir/some_file',
'bct-2/input.lammps',
'bct-2/conf.lmp',
'graph.pb',
'some_dir/some_file',
]
for file in check_file_list:
f1 = os.path.join(self.tmp_local_root, '0_md/', file)
Expand Down

0 comments on commit 6f38981

Please sign in to comment.