Skip to content

Commit

Permalink
Merge pull request #27 from felix5572/master
Browse files Browse the repository at this point in the history
update for if_cuda_multi_devices
  • Loading branch information
felix5572 committed May 26, 2021
2 parents 0bbe9a4 + c464c28 commit 8e1307f
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 12 deletions.
7 changes: 5 additions & 2 deletions dpdispatcher/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
test $? -ne 0 && exit 1
if [ ! -f {task_tag_finished} ] ;then
{command_env} {command} {log_err_part}
if test $? -ne 0; then touch {task_tag_finished}; fi
touch {task_tag_finished}
if test $? -eq 0; then touch {task_tag_finished}; fi
fi &
"""

Expand Down Expand Up @@ -155,6 +154,10 @@ def gen_script_wait(self, resources):
resources.task_in_para = 0
if resources.strategy['if_cuda_multi_devices'] is True:
resources.gpu_in_use += 1
if resources.gpu_in_use % resources.gpu_per_node == 0:
return "wait \n"
else:
return ""
return "wait \n"
return ""

Expand Down
4 changes: 2 additions & 2 deletions dpdispatcher/lazy_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__ (self,
assert(type(local_root) == str)
self.temp_local_root = os.path.abspath(local_root)
self.temp_remote_root = os.path.abspath(local_root)
self.job_uuid = None
# self.job_uuid = None
self.submission = None
# if job_uuid:
# self.job_uuid=job_uuid
Expand Down Expand Up @@ -93,7 +93,7 @@ def block_checkcall(self,
code = proc.returncode
if code != 0:
os.chdir(cwd)
raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.job_uuid))
raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.submission.submission_hash))
os.chdir(cwd)
return None, stdout, stderr

Expand Down
2 changes: 1 addition & 1 deletion dpdispatcher/local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def block_checkcall(self,
code = proc.returncode
if code != 0:
os.chdir(cwd)
raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.job_uuid))
raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.submission.submission_hash))
os.chdir(cwd)
return None, stdout, stderr

Expand Down
14 changes: 7 additions & 7 deletions dpdispatcher/ssh_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def __init__ (self,
):
assert(type(local_root) == str)
self.temp_local_root = os.path.abspath(local_root)
self.job_uuid = None
# self.job_uuid = None
self.clean_asynchronously = clean_asynchronously
# self.job_uuid = job_uuid
# if job_uuid:
Expand Down Expand Up @@ -230,7 +230,7 @@ def bind_submission(self, submission):
# self.remote_root = os.path.join(self.temp_remote_root, self.submission.submission_hash, self.submission.work_base )
self.remote_root = os.path.join(self.temp_remote_root, self.submission.submission_hash)

self.job_uuid = submission.submission_hash
# self.job_uuid = submission.submission_hash
print('debug:SSHContext.bind_submission',
submission.submission_hash,
self.local_root, self.remote_root)
Expand Down Expand Up @@ -310,13 +310,13 @@ def block_checkcall(self,
if retry < 3:
# sleep 60 s
dlog.warning("Get error code %d in calling %s through ssh with job: %s . message: %s" %
(exit_status, cmd, self.job_uuid, stderr.read().decode('utf-8')))
(exit_status, cmd, self.submission.submission_hash, stderr.read().decode('utf-8')))
dlog.warning("Sleep 60 s and retry the command...")
time.sleep(60)
return self.block_checkcall(cmd, asynchronously=asynchronously, retry=retry+1)
print('debug:self.remote_root, cmd', self.remote_root, cmd)
raise RuntimeError("Get error code %d in calling %s through ssh with job: %s . message: %s" %
(exit_status, cmd, self.job_uuid, stderr.read().decode('utf-8')))
(exit_status, cmd, self.submission.submission_hash, stderr.read().decode('utf-8')))
return stdin, stdout, stderr

def block_call(self,
Expand Down Expand Up @@ -389,7 +389,7 @@ def _rmtree(self, remotepath, verbose = False):
def _put_files(self,
files,
dereference = True) :
of = self.job_uuid + '.tgz'
of = self.submission.submission_hash + '.tgz'
# local tar
cwd = os.getcwd()
os.chdir(self.local_root)
Expand Down Expand Up @@ -419,7 +419,7 @@ def _put_files(self,

def _get_files(self,
files) :
of = self.job_uuid + '.tar.gz'
of = self.submission.submission_hash + '.tar.gz'
# remote tar
# If the number of files are large, we may get "Argument list too long" error.
# Thus, we may run tar commands for serveral times and tar only 100 files for
Expand All @@ -429,7 +429,7 @@ def _get_files(self,
if ntar <= 1:
self.block_checkcall('tar czf %s %s' % (of, " ".join(files)))
else:
of_tar = self.job_uuid + '.tar'
of_tar = self.submission.submission_hash + '.tar'
for ii in range(ntar):
ff = files[per_nfile * ii : per_nfile * (ii+1)]
if ii == 0:
Expand Down
21 changes: 21 additions & 0 deletions tests/jsons/machine.if_cuda_multi_devices.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"batch":{
"batch_type": "shell",
"context_type": "local",
"local_root" : "test_if_cuda_multi_devices/",
"remote_root" : "tmp_if_cuda_multi_devices/",
"hostname": "39.106.xx.xxx",
"username": "user1"
},
"resources":{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 4,
"queue_name": "GPU_2080Ti",
"group_size": 16,
"para_deg": 2,
"strategy": {
"if_cuda_multi_devices": true
}
}
}
30 changes: 30 additions & 0 deletions tests/jsons/machine_diffenert.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"train":{
"batch":{
"batch_type": "pbs",
"context_type": "lazy_local",
"local_root": "./"
},
"resources":{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 1,
"queue_name": "T4_4_14",
"group_size": 5
}
},
"md":{
"batch":{
"batch_type": "Shell",
"context_type": "lazy_local",
"local_root": "./"
},
"resources":{
"number_node": null,
"cpu_per_node": null,
"gpu_per_node": 0,
"queue_name": null,
"group_size": 5
}
}
}
16 changes: 16 additions & 0 deletions tests/test_if_cuda_multi_devices/test_dir/out.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
dpdispatcher_unittest_7
dpdispatcher_unittest_9
dpdispatcher_unittest_6
dpdispatcher_unittest_14
dpdispatcher_unittest_10
dpdispatcher_unittest_8
dpdispatcher_unittest_12
dpdispatcher_unittest_5
dpdispatcher_unittest_1
dpdispatcher_unittest_2
dpdispatcher_unittest_13
dpdispatcher_unittest_4
dpdispatcher_unittest_11
dpdispatcher_unittest_3
dpdispatcher_unittest_0
dpdispatcher_unittest_15
Empty file.
40 changes: 40 additions & 0 deletions tests/test_shell_cuda_multi_devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import sys, os, shutil
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
__package__ = 'tests'
from .context import Submission, Job, Task, Resources
from .context import Shell
from .context import LocalContext
from .context import get_file_md5
from .context import Machine

import unittest

class TestShellTrival(unittest.TestCase):
def setUp(self):
self.maxDiff = None

def test_shell_trival(self):
machine = Machine.load_from_json_file(json_path='jsons/machine.if_cuda_multi_devices.json')
submission = Submission(work_base='test_dir/', resources=machine.resources, forward_common_files=['test.txt'], backward_common_files=['out.txt'])
task_list = []
for ii in range(16):
task = Task(command=f"echo dpdispatcher_unittest_{ii}", task_work_path='./', forward_files=[], backward_files=[], outlog='out.txt')
task_list.append(task)
submission.register_task_list(task_list=task_list)
submission.bind_batch(batch=machine.batch)
submission.run_submission(clean=False)

for ii in ['out.txt', 'test.txt']:
f1 = os.path.join('test_if_cuda_multi_devices/', 'test_dir/', ii)
f2 = os.path.join('tmp_if_cuda_multi_devices/', submission.submission_hash, ii)
self.assertEqual(get_file_md5(f1), get_file_md5(f2))

@classmethod
def tearDownClass(self):
# pass
shutil.rmtree('tmp_if_cuda_multi_devices/')





0 comments on commit 8e1307f

Please sign in to comment.