Skip to content

Commit

Permalink
Merge pull request #61 from Cloudac7/lsf
Browse files Browse the repository at this point in the history
Update LSF module to keep consistent with DPDispatcher
  • Loading branch information
felix5572 committed Jun 16, 2021
2 parents 800fd8b + 47b1212 commit af166b9
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 215 deletions.
173 changes: 18 additions & 155 deletions dpdispatcher/lsf.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
from abc import ABC
import os, getpass, time


from dpdispatcher.machine import Machine
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.submission import Resources


lsf_script_template = """\
{lsf_script_header}
{lsf_script_env}
{lsf_script_command}
{lsf_script_end}
"""

lsf_script_header_template = """\
#!/bin/bash -l
Expand All @@ -19,123 +13,31 @@
{lsf_nodes_line}
{lsf_ptile_line}
{lsf_partition_line}
{lsf_walltime_line}
{lsf_number_gpu_line}
"""

lsf_script_env_template = """
{prepend_text}
export REMOTE_ROOT={remote_root}
test $? -ne 0 && exit 1
"""

lsf_script_command_template = """
cd $REMOTE_ROOT
cd {task_work_path}
test $? -ne 0 && exit 1
if [ ! -f {task_tag_finished} ] ;then
{command_env} {command} 1>> {outlog} 2>> {errlog}
if test $? -ne 0; then touch {task_tag_finished}; fi
touch {task_tag_finished}
fi
"""

lsf_script_end_template = """
cd $REMOTE_ROOT
test $? -ne 0 && exit 1
wait
{append_text}
touch {job_tag_finished}
"""

lsf_script_wait = """
wait
"""

default_lsf_bsub_dict = {
'm': "8G"
}


# class LSFResources(Resources):
# """
# LSF resources
# """
# def __init__(
# self,
# number_node,
# cpu_per_node,
# gpu_per_node,
# queue_name,
# walltime="120:00:00",
# prepend_text="",
# append_text="",
# gpu_usage=True,
# gpu_new_syntax=True,
# lsf_bsub_dict=None,
# group_size=1
# ):
# """define LSF resources
#
# Parameters
# ----------
# number_node: nodes to be used
# cpu_per_node: CPU cores used on each node
# gpu_per_node: GPU
# queue_name: the name of queue
# walltime: max time of task
# prepend_text: prepend scripts, code executed before the task run
# append_text: append scripts, code executed after the task run
# gpu_usage: choose if GPU line is used
# lsf_bsub_dict: other bsub parameters.
# group_size: tasks contained by each group
# """
# super().__init__(number_node, cpu_per_node, gpu_per_node, queue_name, group_size)
# if lsf_bsub_dict is None:
# lsf_bsub_dict = {}
# self.walltime = walltime
# self.gpu_new_syntax = gpu_new_syntax
# self.gpu_usage = gpu_usage
# self.prepend_text = prepend_text
# self.append_text = append_text
# self.lsf_bsub_dict = lsf_bsub_dict


class LSF(Machine):
"""
LSF batch
"""

def gen_script(self, job):
# if type(job.resources) is LSFResources:
# resources = job.resources
# lsf_bsub_dict = job.resources.lsf_bsub_dict
# if lsf_bsub_dict is None:
# lsf_bsub_dict = {}
# elif type(job.resources) is Resources:
# resources = LSFResources(**job.resources.__dict__)
# lsf_bsub_dict = {}
# else:
# raise RuntimeError('type job.resource error')
lsf_script = super(LSF, self).gen_script(job)
return lsf_script

def gen_script_header(self, job):
resources = job.resources
lsf_bsub_dict = resources.extra_specification.copy()

# headers
script_header_dict = {
'lsf_nodes_line': "#BSUB -n {number_cores}".format(
number_cores=resources.number_node * resources.cpu_per_node),
'lsf_ptile_line': "#BSUB -R 'span[ptile={cpu_per_node}]'".format(
cpu_per_node=resources.cpu_per_node),
'lsf_partition_line': "#BSUB -q {queue_name}".format(
queue_name=resources.queue_name),
'lsf_walltime_line': "#BSUB -W {walltime}".format(
walltime=resources.kwargs.get('walltime', '12:00'))
queue_name=resources.queue_name)
}
gpu_usage_flag = resources.kwargs.get('gpu_usage', False)
gpu_new_syntax_flag = resources.kwargs.get('gpu_new_syntax', False)
gpu_usage_flag = resources.kwargs['kwargs'].get('gpu_usage', False)
gpu_new_syntax_flag = resources.kwargs['kwargs'].get('gpu_new_syntax', False)
if gpu_usage_flag is True:
if gpu_new_syntax_flag is True:
script_header_dict['lsf_number_gpu_line'] = "#BSUB -gpu 'num={gpu_per_node}:mode=shared:" \
Expand All @@ -149,51 +51,7 @@ def gen_script(self, job):
script_header_dict['lsf_number_gpu_line'] = ""
lsf_script_header = lsf_script_header_template.format(**script_header_dict)

for k, v in lsf_bsub_dict.items():
line = "#BSUB -{key} {value}\n".format(key=k, value=str(v))
lsf_script_header += line

# envs
script_env_dict = {
'prepend_text': resources.kwargs.get('prepend_text', ""),
'remote_root': self.context.remote_root
}
lsf_script_env = lsf_script_env_template.format(**script_env_dict)

# commands
lsf_script_command = ""

for task in job.job_task_list:
command_env = ""
lsf_script_command += self.get_script_wait(resources=resources, task=task)
command_env += self.get_command_env_cuda_devices(resources=resources, task=task)

task_tag_finished = task.task_hash + '_task_tag_finished'

temp_lsf_script_command = lsf_script_command_template.format(
command_env=command_env,
task_work_path=task.task_work_path,
command=task.command,
task_tag_finished=task_tag_finished,
outlog=task.outlog, errlog=task.errlog
)

lsf_script_command += temp_lsf_script_command

# end
job_tag_finished = job.job_hash + '_job_tag_finished'
lsf_script_end = lsf_script_end_template.format(
append_text=resources.kwargs.get('append_text', ""),
job_tag_finished=job_tag_finished
)

# join the whole script
lsf_script = lsf_script_template.format(
lsf_script_header=lsf_script_header,
lsf_script_env=lsf_script_env,
lsf_script_command=lsf_script_command,
lsf_script_end=lsf_script_end)
return lsf_script
return lsf_script_header

def do_submit(self, job):
script_file_name = job.script_file_name
Expand All @@ -208,10 +66,16 @@ def do_submit(self, job):
self.context.write_file(job_id_name, job_id)
return job_id

# TODO: add default resources
# TODO: derive abstract methods
def default_resources(self, resources):
pass

def sub_script_cmd(self, res):
pass

def sub_script_head(self, res):
pass

def check_status(self, job):
try:
job_id = job.job_id
Expand Down Expand Up @@ -253,5 +117,4 @@ def check_status(self, job):

def check_finish_tag(self, job):
job_tag_finished = job.job_hash + '_job_tag_finished'
print('job finished: ', job.job_id, job_tag_finished)
return self.context.check_file_exists(job_tag_finished)
5 changes: 3 additions & 2 deletions dpdispatcher/ssh_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def arginfo():
doc_username = 'username of target linux system'
doc_password = 'password of linux system'
doc_port = 'ssh connection port.'
doc_key_filename = 'key_filename used by ssh connection'
doc_passphrase = 'passphrase used by ssh connection'
doc_key_filename = 'key filename used by ssh connection. If left None, find key in ~/.ssh or ' \
'use password for login'
doc_passphrase = 'passphrase of key used by ssh connection'
doc_timeout = 'timeout of ssh connection'

ssh_remote_profile_args = [
Expand Down
85 changes: 27 additions & 58 deletions tests/devel_test_lsf.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,49 @@
import os
import sys
import json
from dpdispatcher.submission import Submission, Job, Task, Resources
from dpdispatcher.machine import Machine

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from dpdispatcher.ssh_context import SSHSession, SSHContext
from dpdispatcher.submission import Submission, Task, Resources
from dpdispatcher.lsf import LSF

# edit ssh parameters
ssh_session = SSHSession(
hostname='127.0.0.1',
port=22,
remote_root='/home/dp/dpdispatcher/tests',
username='debug'
)
# local_root is the relevant path that containing tasks to be submit
ssh_context = SSHContext(local_root='test_lsf_dir', ssh_session=ssh_session)
lsf = LSF(context=ssh_context)

prepend_text = '''
module load cuda/9.2
module load gcc/4.9.4
module load deepmd/1.0
source /home/dp/scripts/avail_gpu.sh
'''
# task_need_resources has no effect
with open("jsons/machine_lsf.json", 'r') as f:
mdata = json.load(f)

lsf_bsub_dict = {
'R': "'select[hname != g005]'"
}
resources = Resources(
number_node=1,
cpu_per_node=4,
gpu_per_node=0,
queue_name="gpu",
walltime="24:00:00",
prepend_text=prepend_text,
append_text="",
gpu_usage=False,
gpu_new_syntax=False,
extra_specification=lsf_bsub_dict,
group_size=1
)
machine = Machine.load_from_dict(mdata['machine'])
resources = Resources.load_from_dict(mdata['resources'])

# task_need_resources has no effect
submission = Submission(
work_base='0_md', # the dir containing all of task_work_path
resources=resources, # resources above
forward_common_files=['graph.pb'], # file to be upload
backward_common_files=['*.json'] # file to be downloaded
work_base='0_md/',
machine=machine,
resources=resources,
forward_common_files=['graph.pb'],
backward_common_files=[]
)

task1 = Task(
command='lmp_mpi_20201029 -i input.lammps',
task_work_path='bct-1',
command='lmp -i input.lammps',
task_work_path='bct-1/',
forward_files=['conf.lmp', 'input.lammps'],
backward_files=['log.lammps'],
backward_files=['log.lammps']
)
task2 = Task(
command='lmp_mpi_20201029 -i input.lammps',
task_work_path='bct-2',
command='lmp -i input.lammps',
task_work_path='bct-2/',
forward_files=['conf.lmp', 'input.lammps'],
backward_files=['log.lammps'],
backward_files=['log.lammps']
)
task3 = Task(
command='lmp_mpi_20201029 -i input.lammps',
task_work_path='bct-3',
command='lmp -i input.lammps',
task_work_path='bct-3/',
forward_files=['conf.lmp', 'input.lammps'],
backward_files=['log.lammps'],
backward_files=['log.lammps']
)
task4 = Task(
command='lmp_mpi_20201029 -i input.lammps',
task_work_path='bct-4',
command='lmp -i input.lammps',
task_work_path='bct-4/',
forward_files=['conf.lmp', 'input.lammps'],
backward_files=['log.lammps'],
backward_files=['log.lammps']
)
submission.register_task_list([task1, task2, task3, task4, ])
submission.generate_jobs()
submission.bind_batch(batch=lsf)

submission.run_submission()
submission.run_submission(clean=True)
42 changes: 42 additions & 0 deletions tests/jsons/machine_lsf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"machine":{
"batch_type": "LSF",
"context_type": "SSHContext",
"local_root": "./test_lsf_dir",
"remote_root": "/data/home/ypliu/dptasks",
"remote_profile": {
"hostname": "123.45.78.99",
"port": 56789,
"username": "ypliu"
}
},
"resources":{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 1,
"queue_name": "gpu",
"group_size": 4,
"custom_flags": [
"#BSUB -R \"select[hname != g005]\"",
"#BSUB -W 24:00"
],
"strategy": {
"if_cuda_multi_devices": false
},
"para_deg": 1,
"module_unload_list": [],
"module_list": [
"use.own",
"deepmd/1.3"
],
"source_list": [
"/data/home/ypliu/scripts/avail_gpu.sh",
"/data/home/ypliu/dprun/tf_envs.sh"
],
"envs": {"DP_DISPATCHER_EXPORT": "test_foo_bar_baz"},
"kwargs": {
"gpu_usage": false,
"gpu_new_syntax": false
}
}
}
1 change: 1 addition & 0 deletions tests/test_lsf_dir/0_md/graph.pb

0 comments on commit af166b9

Please sign in to comment.