Skip to content

Commit

Permalink
Merge pull request #20 from felix5572/master
Browse files Browse the repository at this point in the history
add support for dpcloudserver; fix unittests; modify script generation
  • Loading branch information
felix5572 committed May 24, 2021
2 parents 3d0ac93 + 332beda commit 8614bbd
Show file tree
Hide file tree
Showing 23 changed files with 262 additions and 373 deletions.
145 changes: 123 additions & 22 deletions dpdispatcher/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,43 @@

from dpdispatcher.JobStatus import JobStatus
from dpdispatcher import dlog


script_template="""\
{script_header}
{script_custom_flags}
{script_env}
{script_command}
{script_end}
"""

script_env_template="""
export REMOTE_ROOT={remote_root}
test $? -ne 0 && exit 1
{source_files_part}
"""

script_command_template="""
cd $REMOTE_ROOT
cd {task_work_path}
test $? -ne 0 && exit 1
if [ ! -f {task_tag_finished} ] ;then
{command_env} {command} {log_err_part}
if test $? -ne 0; then touch {task_tag_finished}; fi
touch {task_tag_finished}
fi &
"""

script_end_template="""
cd $REMOTE_ROOT
test $? -ne 0 && exit 1
wait
touch {job_tag_finished}
"""


class Batch(object):
def __init__ (self,
context):
Expand All @@ -13,7 +50,6 @@ def __init__ (self,
# self.sub_script_name = '%s.sub' % self.context.job_uuid
# self.job_id_name = '%s_job_id' % self.context.job_uuid


def check_status(self, job) :
raise NotImplementedError('abstract method check_status should be implemented by derived class')

Expand All @@ -32,8 +68,20 @@ def do_submit(self, job):
'''
raise NotImplementedError('abstract method do_submit should be implemented by derived class')

def gen_script(self, **kwargs):
raise NotImplementedError('abstract method gen_script should be implemented by derived class')
def gen_script(self, job):
script_header = self.gen_script_header(job)
script_custom_flags = self.gen_script_custom_flags_lines(job)
script_env = self.gen_script_env(job)
script_command = self.gen_script_command(job)
script_end = self.gen_script_end(job)
script = script_template.format(
script_header=script_header,
script_custom_flags=script_custom_flags,
script_env=script_env,
script_command=script_command,
script_end=script_end
)
return script

def check_if_recover(self, submission):
submission_hash = submission.submission_hash
Expand All @@ -44,33 +92,86 @@ def check_if_recover(self, submission):
def check_finish_tag(self, **kwargs):
raise NotImplementedError('abstract method check_finish_tag should be implemented by derived class')

def get_script_wait(self, resources, task):
if not resources.strategy.get('if_cuda_multi_devices', None):
return "wait \n"
def gen_script_header(self, job):
raise NotImplementedError('abstract method gen_script_header should be implemented by derived class')

task_need_gpus = task.task_need_gpus
if resources.gpu_in_use + task_need_gpus > resources.gpu_per_node:
def gen_script_custom_flags_lines(self, job):
custom_flags_lines = ""
custom_flags = job.resources.custom_flags
for ii in custom_flags:
line = ii + '\n'
custom_flags_lines += line
return custom_flags_lines

def gen_script_env(self, job):
source_files_part = ""
source_list = job.resources.source_list
for ii in source_list:
line = f"source {ii}\n"
source_files_part += line

script_env = script_env_template.format(
remote_root=self.context.remote_root,
source_files_part=source_files_part)
return script_env

def gen_script_command(self, job):
script_command = ""
resources = job.resources
# in_para_task_num = 0
for task in job.job_task_list:
command_env = ""
command_env += self.gen_command_env_cuda_devices(resources=resources)

task_tag_finished = task.task_hash + '_task_tag_finished'

log_err_part = ""
if task.outlog is not None:
log_err_part += f"1>>{task.outlog} "
if task.errlog is not None:
log_err_part += f"2>>{task.errlog} "

single_script_command = script_command_template.format(command_env=command_env,
task_work_path=task.task_work_path, command=task.command, task_tag_finished=task_tag_finished,
log_err_part=log_err_part)
script_command += single_script_command

script_command += self.gen_script_wait(resources=resources)
return script_command

def gen_script_end(self, job):
job_tag_finished = job.job_hash + '_job_tag_finished'
script_end = script_end_template.format(job_tag_finished=job_tag_finished)
return script_end

def gen_script_wait(self, resources):
# if not resources.strategy.get('if_cuda_multi_devices', None):
# return "wait \n"
para_deg = resources.para_deg
resources.task_in_para += 1
# task_need_gpus = task.task_need_gpus
if resources.task_in_para >= para_deg:
# pbs_script_command += pbs_script_wait
resources.gpu_in_use = 0
resources.task_in_para = 0
if resources.strategy['if_cuda_multi_devices'] is True:
resources.gpu_in_use += 1
return "wait \n"
return ""
def get_command_env_cuda_devices(self, resources, task):
task_need_resources = task.task_need_resources
task_need_gpus = task_need_resources.get('task_need_gpus', 1)

def gen_command_env_cuda_devices(self, resources):
# task_need_resources = task.task_need_resources
# task_need_gpus = task_need_resources.get('task_need_gpus', 1)
command_env = ""
# gpu_number = resources.gpu_per_node
# resources.gpu_in_use = 0

if resources.strategy['if_cuda_multi_devices'] is True:
min_CUDA_VISIBLE_DEVICES = int(resources.gpu_in_use)
max_CUDA_VISIBLE_DEVICES = int(resources.gpu_in_use + task_need_gpus - 0.000000001)
list_CUDA_VISIBLE_DEVICES = list(range(min_CUDA_VISIBLE_DEVICES, max_CUDA_VISIBLE_DEVICES+1))
if len(list_CUDA_VISIBLE_DEVICES) == 0:
raise RuntimeError("list_CUDA_VISIBLE_DEVICES can not be empty")
command_env+="export CUDA_VISIBLE_DEVICES="
for ii in list_CUDA_VISIBLE_DEVICES:
command_env+="{ii},".format(ii=ii)
command_env+=" ;"
if resources.gpu_per_node == 0:
raise RuntimeError("resources.gpu_per_node can not be 0")
gpu_index = resources.gpu_in_use % resources.gpu_per_node
command_env+=f"export CUDA_VISIBLE_DEVICES={gpu_index};"
# for ii in list_CUDA_VISIBLE_DEVICES:
# command_env+="{ii},".format(ii=ii)
return command_env


5 changes: 3 additions & 2 deletions dpdispatcher/dp_cloud_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dpdispatcher.batch import Batch
from dpdispatcher.shell import Shell
from dpdispatcher.dpcloudserver import api
from dpdispatcher.dpcloudserver.config import API_HOST, ALI_STS_ENDPOINT
from dpdispatcher.dpcloudserver.config import API_HOST, ALI_OSS_BUCKET_URL

# input_data = {
# 'job_type': 'indicate',
Expand Down Expand Up @@ -50,7 +50,8 @@ def do_submit(self, job):
self.gen_local_script(job)
zip_filename = job.job_hash + '.zip'
oss_task_zip = 'indicate/' + job.job_hash + '/' + zip_filename
job_resources = ALI_STS_ENDPOINT + '/' + oss_task_zip
job_resources = ALI_OSS_BUCKET_URL + oss_task_zip
# job_resources = ALI_STS_ENDPOINT + '/' + oss_task_zip
print(897, job_resources)
# oss_task_zip = 'indicate'
# oss_path =
Expand Down
1 change: 1 addition & 0 deletions dpdispatcher/dpcloudserver/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def login(username, password):
'/account/login',
{"username": username, "password": password}
)
print('debug181', ret)
token = ret['token']


Expand Down
4 changes: 3 additions & 1 deletion dpdispatcher/dpcloudserver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@


ALI_STS_ENDPOINT = 'http://oss-cn-shenzhen.aliyuncs.com'
ALI_STS_BUCKET_NAME = 'dpcloudserver'
ALI_STS_BUCKET_NAME = 'dpcloudserver'
ALI_OSS_BUCKET_URL = 'https://dpcloudserver.oss-cn-shenzhen.aliyuncs.com/'
# ALI_OSS_BUCKET_URL = 'https://dpcloudserver.oss-cn-shenzhen.aliyuncs.com/
13 changes: 7 additions & 6 deletions dpdispatcher/dpcloudserver/zip_file.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import os
import os, glob
from zipfile import ZipFile

def zip_file_list(root_path, zip_filename, file_list=[]):
out_zip_file = os.path.join(root_path, zip_filename)
zip_obj = ZipFile(out_zip_file, 'w')
for file in file_list:
filename = os.path.join(root_path, file)
acrname = os.path.relpath(filename, start=root_path)
zip_obj.write(filename, acrname)
for f in file_list:
matched_files = os.path.join(root_path, f)
for ii in glob.glob(matched_files):
arcname = os.path.relpath(ii, start=root_path)
zip_obj.write(ii, arcname)
zip_obj.close()
return out_zip_file

Expand Down Expand Up @@ -60,4 +61,4 @@ def is_selected(arcname, selected):
def unzip_file(zip_file, out_dir='./'):
obj = ZipFile(zip_file, "r")
for item in obj.namelist():
obj.extract(item, out_dir)
obj.extract(item, out_dir)
3 changes: 2 additions & 1 deletion dpdispatcher/lazy_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def bind_submission(self, submission):
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = os.path.join(self.temp_local_root, submission.work_base)
# print('debug:LazyLocalContext',self.local_root, self.remote_root)
print('debug:LazyLocalContext.bind_submission', submission.submission_hash,
self.local_root, self.remote_root)

def get_job_root(self) :
return self.local_root
Expand Down
3 changes: 3 additions & 0 deletions dpdispatcher/local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def bind_submission(self, submission):
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = os.path.join(self.temp_remote_root, submission.submission_hash)
print('debug:LocalContext.bind_submission', submission.submission_hash,
self.local_root, self.remote_root)

# os.makedirs(self.remote_root, exist_ok = True)
# self.job_uuid = submission.submission_hash
# self.remote_root = os.path.join(self.work_profile.get_work_root(), self.job_uuid)
Expand Down
83 changes: 11 additions & 72 deletions dpdispatcher/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,90 +4,29 @@
from dpdispatcher import dlog
from dpdispatcher.batch import Batch

pbs_script_template="""
{pbs_script_header}
{pbs_script_env}
{pbs_script_command}
{pbs_script_end}
"""

pbs_script_header_template="""
#!/bin/bash -l
{select_node_line}
{walltime_line}
#PBS -j oe
{queue_name_line}
"""

pbs_script_env_template="""
cd $PBS_O_WORKDIR
test $? -ne 0 && exit 1
"""

pbs_script_command_template="""
cd $PBS_O_WORKDIR
cd {task_work_path}
test $? -ne 0 && exit 1
if [ ! -f {task_tag_finished} ] ;then
{command_env} {command} 1>> {outlog} 2>> {errlog}
if test $? -ne 0; then touch {task_tag_finished}; fi
touch {task_tag_finished}
fi &
"""

pbs_script_end_template="""
cd $PBS_O_WORKDIR
test $? -ne 0 && exit 1
wait
touch {job_tag_finished}
"""

# pbs_script_wait="""
# wait
# """

class PBS(Batch):
def gen_script(self, job):
resources = job.resources

script_header_dict= {}
script_header_dict['select_node_line']="#PBS -l select={number_node}:ncpus={cpu_per_node}:ngpus={gpu_per_node}".format(
number_node=resources.number_node, cpu_per_node=resources.cpu_per_node, gpu_per_node=resources.gpu_per_node)
script_header_dict['walltime_line']="#PBS -l walltime=120:0:0"
script_header_dict['queue_name_line']="#PBS -q {queue_name}".format(queue_name=resources.queue_name)

pbs_script_header = pbs_script_header_template.format(**script_header_dict)

pbs_script_env = pbs_script_env_template.format()

pbs_script_command = ""

for task in job.job_task_list:
command_env = ""
pbs_script_command += self.get_script_wait(resources=resources, task=task)
command_env += self.get_command_env_cuda_devices(resources=resources, task=task)

task_tag_finished = task.task_hash + '_task_tag_finished'

temp_pbs_script_command = pbs_script_command_template.format(command_env=command_env,
task_work_path=task.task_work_path, command=task.command, task_tag_finished=task_tag_finished,
outlog=task.outlog, errlog=task.errlog)
pbs_script_command+=temp_pbs_script_command
pbs_script = super(PBS, self).gen_script(job)
return pbs_script

job_tag_finished = job.job_hash + '_job_tag_finished'
pbs_script_end = pbs_script_end_template.format(job_tag_finished=job_tag_finished)
def gen_script_header(self, job):
resources = job.resources
pbs_script_header_dict= {}
pbs_script_header_dict['select_node_line']="#PBS -l select={number_node}:ncpus={cpu_per_node}:ngpus={gpu_per_node}".format(
number_node=resources.number_node, cpu_per_node=resources.cpu_per_node, gpu_per_node=resources.gpu_per_node
)
pbs_script_header_dict['queue_name_line']="#PBS -q {queue_name}".format(queue_name=resources.queue_name)
pbs_script_header = pbs_script_header_template.format(**pbs_script_header_dict)
return pbs_script_header

pbs_script = pbs_script_template.format(
pbs_script_header=pbs_script_header,
pbs_script_env=pbs_script_env,
pbs_script_command=pbs_script_command,
pbs_script_end=pbs_script_end)
return pbs_script

def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
Expand Down

0 comments on commit 8614bbd

Please sign in to comment.