Skip to content

Commit

Permalink
Merge pull request #7 from felix5572/master
Browse files Browse the repository at this point in the history
refactoring some unittests
  • Loading branch information
felix5572 committed Jan 6, 2021
2 parents 81b1b9d + 549aa4e commit 1c98163
Show file tree
Hide file tree
Showing 43 changed files with 3,388 additions and 5 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ ener.edr
state*cpt
CMakeCache.txt
CMakeFiles
*.pb
log.lammps
restart.*
dump.*
Expand Down
5 changes: 2 additions & 3 deletions dpdispatcher/JobStatus.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from enum import Enum

class JobStatus (Enum) :
from enum import IntEnum
class JobStatus(IntEnum) :
unsubmitted = 1
waiting = 2
running = 3
Expand Down
6 changes: 6 additions & 0 deletions dpdispatcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os


ROOT_PATH=__path__[0]
NAME="dpdispatcher"
SHORT_CMD="dpdisp"
Expand All @@ -24,6 +25,11 @@
except ImportError:
__date__ = 'unkown'

from .submission import Submission
from .submission import Task
from .submission import Job
from .submission import Resources

def info():
"""
Show basic information about """+NAME+""", its location and version.
Expand Down
56 changes: 56 additions & 0 deletions dpdispatcher/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

import os,sys,time,random,uuid

from dpdispatcher.JobStatus import JobStatus
from dpdispatcher import dlog
class Batch(object) :
def __init__ (self,
context):
self.context = context
# self.uuid_names = uuid_names
self.upload_tag_name = '%s_job_tag_upload' % self.context.job_uuid
self.finish_tag_name = '%s_job_tag_finished' % self.context.job_uuid
self.sub_script_name = '%s.sub' % self.context.job_uuid
self.job_id_name = '%s_job_id' % self.context.job_uuid

def check_status(self, job) :
raise NotImplementedError('abstract method check_status should be implemented by derived class')

def default_resources(self, res) :
raise NotImplementedError('abstract method sub_script_head should be implemented by derived class')

def sub_script_head(self, res) :
raise NotImplementedError('abstract method sub_script_head should be implemented by derived class')

def sub_script_cmd(self, res):
raise NotImplementedError('abstract method sub_script_cmd should be implemented by derived class')

def do_submit(self, job):
'''
submit a single job, assuming that no job is running there.
'''
raise NotImplementedError('abstract method do_submit should be implemented by derived class')

def gen_script(self):
raise NotImplementedError('abstract method gen_script should be implemented by derived class')

def check_finish_tag(self) :
raise NotImplementedError('abstract method check_finish_tag should be implemented by derived class')

def get_command_env_cuda_devices(self, resources, task):
task_need_resources = task.task_need_resources
command_env=""
if resources.if_cuda_multi_devices is True:
min_CUDA_VISIBLE_DEVICES = int(resources.in_use*resources.gpu_per_node)
max_CUDA_VISIBLE_DEVICES = int((resources.in_use + task_need_resources)*resources.gpu_per_node-0.000000001)
list_CUDA_VISIBLE_DEVICES = list(range(min_CUDA_VISIBLE_DEVICES, max_CUDA_VISIBLE_DEVICES+1))
if len(list_CUDA_VISIBLE_DEVICES) == 0:
raise RuntimeError("list_CUDA_VISIBLE_DEVICES can not be empty")

command_env+="export CUDA_VISIBLE_DEVICES="
for ii in list_CUDA_VISIBLE_DEVICES:
command_env+="{ii},".format(ii=ii)
command_env+=" ;"
return command_env


142 changes: 142 additions & 0 deletions dpdispatcher/lazy_local_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os,shutil,uuid
import subprocess as sp
from glob import glob
from dpdispatcher import dlog

class SPRetObj(object) :
def __init__ (self,
ret) :
self.data = ret

def read(self) :
return self.data

def readlines(self) :
lines = self.data.decode('utf-8').splitlines()
ret = []
for aa in lines:
ret.append(aa+'\n')
return ret

class LazyLocalContext(object) :
def __init__ (self,
local_root,
work_profile = None,
job_uuid = None) :
"""
work_profile:
local_root:
"""
assert(type(local_root) == str)
self.local_root = os.path.abspath(local_root)
self.remote_root = self.local_root
self.job_uuid = job_uuid
self.submission = None
# if job_uuid:
# self.job_uuid=job_uuid
# else:
# self.job_uuid = str(uuid.uuid4())

def bind_submission(self, submission):
self.submission = submission

def get_job_root(self) :
return self.local_root

def upload(self,
jobs,
# local_up_files,
dereference = True) :
pass

def download(self,
jobs,
# remote_down_files,
check_exists = False,
mark_failure = True,
back_error=False) :
pass
# for ii in job_dirs :
# for jj in remote_down_files :
# fname = os.path.join(self.local_root, ii, jj)
# exists = os.path.exists(fname)
# if not exists:
# if check_exists:
# if mark_failure:
# with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp: pass
# else:
# pass
# else:
# raise RuntimeError('do not find download file ' + fname)


def block_checkcall(self,
cmd) :
cwd = os.getcwd()
os.chdir(self.local_root)
proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
if code != 0:
os.chdir(cwd)
raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.job_uuid))
os.chdir(cwd)
return None, stdout, stderr

def block_call(self, cmd) :
cwd = os.getcwd()
os.chdir(self.local_root)
proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
os.chdir(cwd)
return code, None, stdout, stderr

def clean(self) :
pass

def write_file(self, fname, write_str):
os.makedirs(os.path.join(self.local_root, self.submission.work_base), exist_ok = True)
with open(os.path.join(self.local_root, self.submission.work_base, fname), 'w') as fp :
fp.write(write_str)

def read_file(self, fname):
with open(os.path.join(self.local_root, self.submission.work_base, fname), 'r') as fp:
ret = fp.read()
return ret

def check_file_exists(self, fname):
return os.path.isfile(os.path.join(self.local_root, fname))

def call(self, cmd) :
cwd = os.getcwd()
os.chdir(self.local_root)
proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
os.chdir(cwd)
return proc

def kill(self, proc):
proc.kill()

def check_finish(self, proc):
return (proc.poll() != None)

def get_return(self, proc):
ret = proc.poll()
if ret is None:
return None, None, None
else :
try:
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
except:
stdout = None
stderr = None
return ret, stdout, stderr


0 comments on commit 1c98163

Please sign in to comment.