Skip to content

Commit

Permalink
Merge pull request #50 from felix5572/master
Browse files Browse the repository at this point in the history
add module_unload_list, module_load, envs to Resources;
  • Loading branch information
felix5572 committed Jun 11, 2021
2 parents 30d04ea + ceedc57 commit a9772d0
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 32 deletions.
27 changes: 25 additions & 2 deletions dpdispatcher/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
script_env_template="""
export REMOTE_ROOT={remote_root}
test $? -ne 0 && exit 1
{module_unload_part}
{module_load_part}
{source_files_part}
{export_envs_part}
"""

script_command_template="""
Expand Down Expand Up @@ -153,14 +156,34 @@ def gen_script_custom_flags_lines(self, job):

def gen_script_env(self, job):
source_files_part = ""

module_unload_part = ""
module_unload_list = job.resources.module_unload_list
for ii in module_unload_list:
module_unload_part += f"module unload {ii}\n"

module_load_part = ""
module_list = job.resources.module_list
for ii in module_list:
module_load_part += f"module load {ii}\n"

source_list = job.resources.source_list
for ii in source_list:
line = "{ source %s \n }" % ii
line = "{ source %s } \n" % ii
source_files_part += line

export_envs_part = ""
envs = job.resources.envs
for k,v in envs.items():
export_envs_part += f"export {k}={v}\n"

script_env = script_env_template.format(
remote_root=self.context.remote_root,
source_files_part=source_files_part)
module_unload_part=module_unload_part,
module_load_part=module_load_part,
source_files_part=source_files_part,
export_envs_part=export_envs_part,
)
return script_env

def gen_script_command(self, job):
Expand Down
3 changes: 1 addition & 2 deletions dpdispatcher/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
{slurm_nodes_line}
{slurm_ntasks_per_node_line}
{slurm_number_gpu_line}
{slurm_partition_line}
"""
{slurm_partition_line}"""

class Slurm(Machine):
def gen_script(self, job):
Expand Down
18 changes: 18 additions & 0 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,10 @@ def __init__(self,
custom_flags=[],
strategy=default_strategy,
para_deg=1,
module_unload_list=[],
module_list=[],
source_list=[],
envs={},
**kwargs):
self.number_node = number_node
self.cpu_per_node = cpu_per_node
Expand All @@ -615,7 +618,10 @@ def __init__(self,
self.custom_flags = custom_flags
self.strategy = strategy
self.para_deg = para_deg
self.module_unload_list = module_unload_list
self.module_list = module_list
self.source_list = source_list
self.envs = envs
# self.if_cuda_multi_devices = if_cuda_multi_devices

self.kwargs = kwargs
Expand Down Expand Up @@ -646,7 +652,10 @@ def serialize(self):
resources_dict['custom_flags'] = self.custom_flags
resources_dict['strategy'] = self.strategy
resources_dict['para_deg'] = self.para_deg
resources_dict['module_unload_list'] = self.module_unload_list
resources_dict['module_list'] = self.module_list
resources_dict['source_list'] = self.source_list
resources_dict['envs'] = self.envs
resources_dict['kwargs'] = self.kwargs
return resources_dict

Expand All @@ -661,7 +670,10 @@ def deserialize(cls, resources_dict):
custom_flags=resources_dict['custom_flags'],
strategy=resources_dict['strategy'],
para_deg=resources_dict['para_deg'],
module_unload_list=resources_dict['module_unload_list'],
module_list=resources_dict['module_list'],
source_list=resources_dict['source_list'],
envs=resources_dict['envs'],
**resources_dict['kwargs'])
return resources

Expand All @@ -687,6 +699,9 @@ def arginfo():
doc_custom_flags = 'The extra lines pass to job submitting script header'
doc_para_deg = 'Decide how many tasks will be run in parallel.'
doc_source_list = 'The env file to be sourced before the command execution.'
doc_module_unload_list = 'The modules to be unloaded on HPC system before submitting jobs'
doc_module_list = 'The modules to be loaded on HPC system before submitting jobs'
doc_envs = 'The environment variables to be exported on before submitting jobs'
# doc_kwargs = 'extra key-value pair'

strategy_args = [
Expand All @@ -707,6 +722,9 @@ def arginfo():
strategy_format,
Argument("para_deg", int, optional=True, doc=doc_para_deg, default=1),
Argument("source_list", list, optional=True, doc=doc_source_list, default=[]),
Argument("module_unload_list", list, optional=True, doc=doc_module_unload_list, default=[]),
Argument("module_list", list, optional=True, doc=doc_module_list, default=[]),
Argument("envs", dict, optional=True, doc=doc_envs, default={}),
]
resources_format = Argument("resources", dict, resources_args)
return resources_format
Expand Down
34 changes: 19 additions & 15 deletions tests/devel_test_slurm.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,44 @@
import sys, os
import sys, os, json
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..' )))

# from .context import dpdispatcher
#from dpdispatcher.local_context import LocalContext
from dpdispatcher.lazy_local_context import LazyLocalContext
# from dpdispatcher.ssh_context import SSHContext

from dpdispatcher.submission import Submission, Job, Task, Resources
from dpdispatcher.batch import Batch
from dpdispatcher.slurm import Slurm
from dpdispatcher.machine import Machine
# from dpdispatcher.submission import
# from dpdispatcher.slurm import Slurm

# local_session = LocalSession({'work_path':'temp2'})
# local_context = LocalContext(local_root='test_slurm_dir/', work_profile=local_session)
lazy_local_context = LazyLocalContext(local_root='test_slurm_dir/')
# lazy_local_context = LazyLocalContext(local_root='test_slurm_dir/')


# machine_dict = dict(hostname='localhost', remote_root='/home/dp/dpdispatcher/tests/temp2', username='dp')
# ssh_session = SSHSession(**machine_dict)
# ssh_session = SSHSession(hostname='8.131.233.55', remote_root='/home/dp/dp_remote', username='dp')
# ssh_context = SSHContext(local_root='test_slurm_dir', ssh_session=ssh_session)
# slurm = Slurm(context=ssh_context)
slurm = Slurm(context=lazy_local_context)
# slurm = Slurm(context=lazy_local_context)

resources = Resources(number_node=1, cpu_per_node=4, gpu_per_node=2, queue_name="GPU_2080Ti", group_size=4,
custom_flags=['#SBATCH --exclude=2080ti000,2080ti001,2080ti002,2080ti004,2080ti005,2080ti006'],
para_deg=2,
strategy={"if_cuda_multi_devices":True})
# resources = Resources(number_node=1, cpu_per_node=4, gpu_per_node=2, queue_name="GPU_2080Ti", group_size=4,
# custom_flags=['#SBATCH --exclude=2080ti000,2080ti001,2080ti002,2080ti004,2080ti005,2080ti006'],
# para_deg=2,
# strategy={"if_cuda_multi_devices":True})
# slurm_sbatch_dict={'mem': '10G', 'cpus_per_task':1, 'time': "120:0:0"}
# slurm_resources = SlurmResources(resources=resources, slurm_sbatch_dict=slurm_sbatch_dict)
submission = Submission(work_base='0_md/', resources=resources, forward_common_files=['graph.pb'], backward_common_files=[]) #, batch=PBS)

with open("jsons/machine_slurm.json", 'r') as f:
mdata = json.load(f)

machine = Machine.load_from_dict(mdata['machine'])
resources = Resources.load_from_dict(mdata['resources'])

submission = Submission(work_base='0_md/', machine=machine, resources=resources, forward_common_files=['graph.pb'], backward_common_files=[]) #, batch=PBS)
task1 = Task(command='lmp -i input.lammps', task_work_path='bct-1/', forward_files=['conf.lmp', 'input.lammps'], backward_files=['log.lammps'])
task2 = Task(command='lmp -i input.lammps', task_work_path='bct-2/', forward_files=['conf.lmp', 'input.lammps'], backward_files=['log.lammps'])
task3 = Task(command='lmp -i input.lammps', task_work_path='bct-3/', forward_files=['conf.lmp', 'input.lammps'], backward_files=['log.lammps'])
task4 = Task(command='lmp -i input.lammps', task_work_path='bct-4/', forward_files=['conf.lmp', 'input.lammps'], backward_files=['log.lammps'])
submission.register_task_list([task1, task2, task3, task4, ])
submission.generate_jobs()
submission.bind_batch(batch=slurm)


submission.run_submission()
5 changes: 4 additions & 1 deletion tests/jsons/job.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"934d9fc3afcc40a1ffadb09954fce2eaf7745ca9": {
"bc1a7297489e921034ced5036cb23ef9daf7b681": {
"job_task_list": [
{
"command": "lmp -i input.lammps",
Expand Down Expand Up @@ -39,7 +39,10 @@
"if_cuda_multi_devices": false
},
"para_deg": 1,
"module_unload_list": [],
"module_list": [],
"source_list": [],
"envs": {},
"kwargs": {}
},
"job_state": null,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
29 changes: 29 additions & 0 deletions tests/jsons/machine_slurm.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"machine":{
"batch_type": "Slurm",
"context_type": "SSHContext",
"local_root": "./test_context_dir",
"remote_root": "/home/fengbo/work_path_dpdispatcher_test",
"remote_profile": {
"hostname": "xxx.200.xxx.59",
"username": "fengbo"
}
},
"resources":{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 2,
"queue_name": "GPU_2080Ti",
"group_size": 4,
"custom_flags": ["#SBATCH --nice=100", "#SBATCH --time=24:00:00"],
"strategy": {
"if_cuda_multi_devices": true
},
"para_deg": 2,
"module_unload_list": ["singularity"],
"module_list": ["singularity/3.0.0"],
"source_list": ["./slurm_test.env"],
"envs": {"DP_DISPATCHER_EXPORT": "test_foo_bar_baz"},
"kwargs": {}
}
}
3 changes: 3 additions & 0 deletions tests/jsons/resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"if_cuda_multi_devices": false
},
"para_deg": 1,
"module_unload_list": [],
"module_list": [],
"source_list": [],
"envs": {},
"kwargs": {}
}
13 changes: 11 additions & 2 deletions tests/jsons/submission.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
"if_cuda_multi_devices": false
},
"para_deg": 1,
"module_unload_list": [],
"module_list": [],
"source_list": [],
"envs": {},
"kwargs": {}
},
"forward_common_files": [
Expand All @@ -20,7 +23,7 @@
"backward_common_files": [],
"belonging_jobs": [
{
"934d9fc3afcc40a1ffadb09954fce2eaf7745ca9": {
"bc1a7297489e921034ced5036cb23ef9daf7b681": {
"job_task_list": [
{
"command": "lmp -i input.lammps",
Expand Down Expand Up @@ -60,7 +63,10 @@
"if_cuda_multi_devices": false
},
"para_deg": 1,
"module_unload_list": [],
"module_list": [],
"source_list": [],
"envs": {},
"kwargs": {}
},
"job_state": null,
Expand All @@ -69,7 +75,7 @@
}
},
{
"5e7449b46128e4043d0476c30c0eeb75aeabdec0": {
"743cfe7b9112c69b242a7e851b6300e3e720beb5": {
"job_task_list": [
{
"command": "lmp -i input.lammps",
Expand Down Expand Up @@ -109,7 +115,10 @@
"if_cuda_multi_devices": false
},
"para_deg": 1,
"module_unload_list": [],
"module_list": [],
"source_list": [],
"envs": {},
"kwargs": {}
},
"job_state": null,
Expand Down
7 changes: 7 additions & 0 deletions tests/sample_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
class SampleClass(object):
@classmethod
def get_sample_resources(cls):

resources = Resources(number_node=1,
cpu_per_node=4,
gpu_per_node=1,
Expand All @@ -24,7 +25,10 @@ def get_sample_resources(cls):
custom_flags=[],
strategy={'if_cuda_multi_devices': False},
para_deg=1,
module_unload_list=[],
module_list=[],
source_list=[],
envs={}
)
return resources

Expand All @@ -38,7 +42,10 @@ def get_sample_resources_dict(cls):
'custom_flags':[],
'strategy':{'if_cuda_multi_devices': False},
'para_deg':1,
'module_unload_list':[],
'module_list':[],
'source_list':[],
'envs':{},
'kwargs': {}
}
return resources_dict
Expand Down
1 change: 1 addition & 0 deletions tests/slurm_test.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export DP_DPDISPATCHER_TEST_VAR="dpdispatcher_foo_bar"
8 changes: 4 additions & 4 deletions tests/test_shell_cuda_multi_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ def setUp(self):
self.maxDiff = None

def test_shell_cuda_multi_devices(self):
with open('jsons/compute.if_cuda_multi_devices.json', 'r') as f:
compute_dict = json.load(f)
machine = Machine.load_from_dict(compute_dict['machine'])
resources = Resources.load_from_dict(compute_dict['resources'])
with open('jsons/machine_if_cuda_multi_devices.json', 'r') as f:
machine_dict = json.load(f)
machine = Machine.load_from_dict(machine_dict['machine'])
resources = Resources.load_from_dict(machine_dict['resources'])

task_list = []
for ii in range(16):
Expand Down
10 changes: 4 additions & 6 deletions tests/test_shell_trival.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import json




class TestShellTrival(unittest.TestCase):
def setUp(self):
self.maxDiff = None
Expand All @@ -24,11 +22,11 @@ def setUp(self):
# }

def test_shell_trival(self):
with open('jsons/compute_local_shell.json', 'r') as f:
compute_dict = json.load(f)
with open('jsons/machine_local_shell.json', 'r') as f:
machine_dict = json.load(f)

machine = Machine.load_from_dict(compute_dict['machine'])
resources = Resources.load_from_dict(compute_dict['resources'])
machine = Machine.load_from_dict(machine_dict['machine'])
resources = Resources.load_from_dict(machine_dict['resources'])

task1 = Task(command='cat example.txt', task_work_path='dir1/', forward_files=['example.txt'], backward_files=['out.txt'], outlog='out.txt')
task2 = Task(command='cat example.txt', task_work_path='dir2/', forward_files=['example.txt'], backward_files=['out.txt'], outlog='out.txt')
Expand Down

0 comments on commit a9772d0

Please sign in to comment.